METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index 95d27db..8044d62 100644 --- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -43,6 +43,14 @@ import static org.elasticsearch.index.query.QueryBuilders.termsQuery; import static org.elasticsearch.index.query.QueryBuilders.typeQuery; +/** + * A {@link RetrieveLatestDao} that retrieves documents from Elasticsearch. + * + * The index being searched must have a field "guid" of type "keyword", + * otherwise no documents can be found by "guid". + * + * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html + */ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { private ElasticsearchClient client;
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index 6489206..16760d2 100644 --- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -19,9 +19,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; +import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.hbase.mock.MockHBaseTableProvider; @@ -29,17 +35,21 @@ import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.HBaseDao; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MultiIndexDao; import org.apache.metron.indexing.dao.UpdateIntegrationTest; import org.apache.metron.integration.UnableToStartException; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Response; +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -55,11 +65,27 @@ private static final String TABLE_NAME = "modifications"; private static final String CF = "p"; private static MockHTable table; - private static IndexDao hbaseDao; private static IndexDao elasticsearchDao; private static AccessConfig accessConfig; private static Map<String, Object> globalConfig; + /** + * { + * "template": "test*", + * "mappings": { + * "test_doc": { + * "properties": { + * "guid": { + * "type": "keyword" + * } + * } + * } + * } + * } + */ + @Multiline + private static String indexTemplate; + @Override protected String getIndexName() { return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); @@ -90,15 +116,16 @@ .withAccessConfig(accessConfig) .build(); es.start(); + + installIndexTemplate(); } @Before public void setup() { - hbaseDao = new HBaseDao(); - elasticsearchDao = new ElasticsearchDao(); - MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao); - dao.init(accessConfig); - setDao(dao); + elasticsearchDao = new ElasticsearchDao() + .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + elasticsearchDao.init(accessConfig); + setDao(elasticsearchDao); } @After @@ -137,4 +164,20 @@ protected MockHTable getMockHTable() { return table; } + + /** + * Install the index template to ensure that "guid" is of type "keyword". The + * {@link org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao} cannot find + * documents if the type is not "keyword". + * + * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html + */ + private static void installIndexTemplate() throws IOException { + HttpEntity broEntity = new NStringEntity(indexTemplate, ContentType.APPLICATION_JSON); + ElasticsearchClient client = ElasticsearchClientFactory.create(globalConfig); + Response response = client + .getLowLevelClient() + .performRequest("PUT", "/_template/test_template", Collections.emptyMap(), broEntity); + Assert.assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200)); + } }
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java index 09f7df9..a240c56 100644 --- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java +++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -282,8 +282,7 @@ } @Override - public Iterable<Document> getAllLatest( - List<GetRequest> getRequests) throws IOException { + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { Iterable<Document> ret = null; List<DocumentIterableContainer> output = indices.parallelStream().map(dao -> {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java index dad6a52..0c0d1f1 100644 --- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java +++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
@@ -18,6 +18,11 @@ package org.apache.metron.indexing.dao; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.junit.Assert; @@ -27,9 +32,16 @@ import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class MultiIndexDaoTest { @@ -40,24 +52,72 @@ private IndexDao dao1; private IndexDao dao2; + private Document document1; + private Document document2; + @Before public void setup() { dao1 = mock(IndexDao.class); dao2 = mock(IndexDao.class); multiIndexDao = new MultiIndexDao(dao1, dao2); + + document1 = new Document(new HashMap<>(), "guid", "bro", 1L); + document2 = new Document(new HashMap<>(), "guid", "bro", 2L); + } + + @Test + public void shouldUpdateAll() throws IOException { + Document actual = multiIndexDao.update(document1, Optional.of("bro")); + Assert.assertEquals(document1, actual); + + // both 'backing' daos should have received the update + verify(dao1).update(eq(document1), eq(Optional.of("bro"))); + verify(dao2).update(eq(document1), eq(Optional.of("bro"))); + } + + @Test(expected = IOException.class) + public void shouldThrowExceptionWithPartialFailureOnUpdate() throws IOException { + // dao2 will throw an exception causing the 'partial failure' + when(dao2.update(any(), any())).thenThrow(new IllegalStateException()); + + multiIndexDao.update(document1, Optional.of("bro")); + } + + @Test + public void shouldBatchUpdateAll() throws IOException { + Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{ + put(document1, Optional.of("bro")); + put(document2, Optional.of("bro")); + }}; + + Map<Document, Optional<String>> actual = multiIndexDao.batchUpdate(updates); + Assert.assertEquals(updates, actual); + + // both 'backing' daos should have received the updates + verify(dao1).batchUpdate(eq(updates)); + verify(dao2).batchUpdate(eq(updates)); + } + + @Test(expected = IOException.class) + public void shouldThrowExceptionWithPartialFailureOnBatchUpdate() throws IOException { + // dao2 will throw an exception causing the 'partial failure' + when(dao2.batchUpdate(any())).thenThrow(new IllegalStateException()); + + Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{ + put(document1, Optional.of("bro")); + put(document2, Optional.of("bro")); + }}; + + multiIndexDao.batchUpdate(updates); } @Test public void getLatestShouldReturnLatestAlert() throws Exception { - Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L); - Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L); - CommentAddRemoveRequest request = new CommentAddRemoveRequest(); request.setGuid("guid"); when(dao1.getLatest("guid", "bro")).thenReturn(document1); when(dao2.getLatest("guid", "bro")).thenReturn(document2); - Document expected = new Document(new HashMap<>(), "guid", "bro", 2L); Assert.assertEquals(expected, multiIndexDao.getLatest("guid", "bro")); } @@ -65,32 +125,125 @@ @Test public void addCommentShouldAddCommentToAlert() throws Exception { Document latest = mock(Document.class); - Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L); - Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L); CommentAddRemoveRequest request = new CommentAddRemoveRequest(); request.setGuid("guid"); when(dao1.addCommentToAlert(request, latest)).thenReturn(document1); when(dao2.addCommentToAlert(request, latest)).thenReturn(document2); - Document expected = new Document(new HashMap<>(), "guid", "bro", 2L); Assert.assertEquals(expected, multiIndexDao.addCommentToAlert(request, latest)); } + @Test(expected = IOException.class) + public void shouldThrowExceptionWithPartialFailureOnAddComment() throws Exception { + Document latest = mock(Document.class); + CommentAddRemoveRequest request = new CommentAddRemoveRequest(); + request.setGuid("guid"); + + // dao2 will throw an exception + when(dao1.addCommentToAlert(request, latest)).thenReturn(document1); + when(dao2.addCommentToAlert(request, latest)).thenThrow(new IllegalStateException()); + + multiIndexDao.addCommentToAlert(request, latest); + } + @Test public void removeCommentShouldRemoveCommentFromAlert() throws Exception { Document latest = mock(Document.class); - Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L); - Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L); CommentAddRemoveRequest request = new CommentAddRemoveRequest(); request.setGuid("guid"); when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1); when(dao2.removeCommentFromAlert(request, latest)).thenReturn(document2); - Document expected = new Document(new HashMap<>(), "guid", "bro", 2L); Assert.assertEquals(expected, multiIndexDao.removeCommentFromAlert(request, latest)); } + + @Test(expected = IOException.class) + public void shouldThrowExceptionWithPartialFailureOnRemoveComment() throws Exception { + Document latest = mock(Document.class); + CommentAddRemoveRequest request = new CommentAddRemoveRequest(); + request.setGuid("guid"); + + // dao2 will throw an exception + when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1); + when(dao2.removeCommentFromAlert(request, latest)).thenThrow(new IllegalStateException()); + + multiIndexDao.removeCommentFromAlert(request, latest); + } + + @Test + public void shouldGetColumnMetadata() throws Exception { + List<String> indices = Arrays.asList("bro"); + + Map<String, FieldType> expected = new HashMap<String, FieldType>() {{ + put("bro", FieldType.TEXT); + }}; + + when(dao1.getColumnMetadata(eq(indices))).thenReturn(null); + when(dao2.getColumnMetadata(eq(indices))).thenReturn(expected); + + Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices); + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldGetColumnMetadataWithNulls() throws Exception { + List<String> indices = Arrays.asList("bro"); + + // both 'backing' DAOs respond with null + when(dao1.getColumnMetadata(eq(indices))).thenReturn(null); + when(dao2.getColumnMetadata(eq(indices))).thenReturn(null); + + Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices); + Assert.assertNull(actual); + } + + @Test + public void shouldSearch() throws Exception { + SearchRequest request = new SearchRequest(); + SearchResponse expected = new SearchResponse(); + + when(dao1.search(eq(request))).thenReturn(null); + when(dao2.search(eq(request))).thenReturn(expected); + + SearchResponse actual = multiIndexDao.search(request); + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldSearchWithNulls() throws Exception { + SearchRequest request = new SearchRequest(); + + when(dao1.search(eq(request))).thenReturn(null); + when(dao2.search(eq(request))).thenReturn(null); + + SearchResponse actual = multiIndexDao.search(request); + Assert.assertNull(actual); + } + + @Test + public void shouldGroup() throws Exception { + GroupRequest request = new GroupRequest(); + GroupResponse expected = new GroupResponse(); + + when(dao1.group(eq(request))).thenReturn(null); + when(dao2.group(eq(request))).thenReturn(expected); + + GroupResponse actual = multiIndexDao.group(request); + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldGroupWithNulls() throws Exception { + GroupRequest request = new GroupRequest(); + + when(dao1.group(eq(request))).thenReturn(null); + when(dao2.group(eq(request))).thenReturn(null); + + GroupResponse actual = multiIndexDao.group(request); + Assert.assertNull(actual); + } }
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java index b15895a..6333d32 100644 --- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java +++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
@@ -65,7 +65,7 @@ protected static final String SENSOR_NAME = "test"; private static final String CF = "p"; - private MultiIndexDao dao; + private IndexDao dao; @Test public void testUpdate() throws Exception { @@ -73,16 +73,13 @@ final String guid = UUID.randomUUID().toString(); final Long timestamp = 1526306463050L; Document toUpdate = createDocument(guid, timestamp); - { - // update the document and validate - Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME)); - Assert.assertEquals(toUpdate, updated); - } - { - // ensure the document is updated in the index - Document indexed = findUpdatedDoc(toUpdate.getDocument(), guid, SENSOR_NAME); - Assert.assertEquals(toUpdate, indexed); - } + + // update the document and validate + Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME)); + Assert.assertEquals(toUpdate, updated); + + // ensure the document was updated in the index + assertDocumentIndexed(toUpdate); } @Test @@ -114,9 +111,9 @@ Assert.assertThat(updated.keySet(), hasItem(document3)); // ensure the documents were written to the index - Assert.assertEquals(document1, findUpdatedDoc(document1.getDocument(), guid1, SENSOR_NAME)); - Assert.assertEquals(document2, findUpdatedDoc(document2.getDocument(), guid2, SENSOR_NAME)); - Assert.assertEquals(document3, findUpdatedDoc(document3.getDocument(), guid3, SENSOR_NAME)); + assertDocumentIndexed(document1); + assertDocumentIndexed(document2); + assertDocumentIndexed(document3); } @Test @@ -226,6 +223,33 @@ return request; } + /** + * Ensures that a document was correctly indexed. + * @param expected The document that should have been indexed. + * @return The document that was retrieved from the index. + */ + private Document assertDocumentIndexed(Document expected) throws Exception { + // search the index for the document + Document actual = findUpdatedDoc(expected.getDocument(), expected.getGuid(), expected.getSensorType()); + + // most fields should match exactly, except the documentID + Assert.assertEquals(expected.getGuid(), actual.getGuid()); + Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); + Assert.assertEquals(expected.getSensorType(), actual.getSensorType()); + Assert.assertEquals(expected.getDocument(), actual.getDocument()); + + if(expected.getDocumentID().isPresent()) { + // the documentID was already defined in 'expected', this ID should have been used when the document was indexed + Assert.assertEquals(expected.getDocumentID().get(), actual.getDocumentID()); + + } else { + // if the documentID was not defined, the indexer should have created one + Assert.assertNotNull(expected.getDocumentID()); + } + + return actual; + } + private Document createAndIndexDocument(String guid) throws Exception { // create the document Long timestamp = 1526306463050L; @@ -236,7 +260,7 @@ Assert.assertEquals(toCreate, created); // ensure the document is indexed - return findUpdatedDoc(toCreate.getDocument(), guid, SENSOR_NAME); + return assertDocumentIndexed(created); } protected Document createDocument(String guid, Long timestamp) { @@ -280,7 +304,7 @@ return dao; } - protected void setDao(MultiIndexDao dao) { + protected void setDao(IndexDao dao) { this.dao = dao; }