| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.elasticsearch.integration; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import org.adrianwalker.multilinestring.Multiline; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.apache.metron.elasticsearch.dao.ElasticsearchDao; |
| import org.apache.metron.elasticsearch.integration.utils.ElasticsearchTestUtils; |
| import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; |
| import org.apache.metron.hbase.mock.MockHTable; |
| import org.apache.metron.hbase.mock.MockHBaseTableProvider; |
| import org.apache.metron.indexing.dao.*; |
| import org.apache.metron.indexing.dao.update.Document; |
| import org.apache.metron.indexing.dao.update.ReplaceRequest; |
| import org.elasticsearch.client.Client; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import java.util.*; |
| import java.util.stream.Collectors; |
| |
| |
| public class ElasticsearchUpdateIntegrationTest { |
| private static final String namespace = ElasticsearchUpdateIntegrationTest.class.getSimpleName().toLowerCase(); |
| private static final int MAX_RETRIES = 10; |
| private static final int SLEEP_MS = 500; |
| private static final String SENSOR_NAME= "test"; |
| private static final String TABLE_NAME = "modifications"; |
| private static final String CF = "p"; |
| private static String index = namespace + "_" + SENSOR_NAME + "_index"; |
| private static MockHTable table; |
| private static IndexDao esDao; |
| private static IndexDao hbaseDao; |
| private static MultiIndexDao dao; |
| private static Client client; |
| |
| /** |
| * { |
| "test_doc" : { |
| "properties" : { |
| "guid" : { |
| "type" : "keyword" |
| }, |
| "ip_src_addr" : { |
| "type" : "keyword" |
| }, |
| "score" : { |
| "type" : "integer" |
| }, |
| "alert" : { |
| "type" : "nested" |
| } |
| } |
| } |
| } |
| */ |
| @Multiline |
| public static String testTypeMappings; |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| Configuration config = HBaseConfiguration.create(); |
| MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); |
| tableProvider.addToCache(TABLE_NAME, CF); |
| table = (MockHTable)tableProvider.getTable(config, TABLE_NAME); |
| // setup the client |
| client = ElasticsearchUtils.getClient(ElasticsearchTestUtils.getGlobalConfig(), null); |
| client.admin().indices().prepareCreate(index).addMapping("test_doc", testTypeMappings).get(); |
| |
| hbaseDao = new HBaseDao(); |
| AccessConfig accessConfig = new AccessConfig(); |
| accessConfig.setTableProvider(tableProvider); |
| Map<String, Object> globalConfig = ElasticsearchTestUtils.getGlobalConfig(); |
| globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME); |
| globalConfig.put(HBaseDao.HBASE_CF, CF); |
| accessConfig.setGlobalConfigSupplier(() -> globalConfig); |
| |
| esDao = new ElasticsearchDao(); |
| |
| dao = new MultiIndexDao(hbaseDao, esDao); |
| dao.init(accessConfig); |
| |
| } |
| |
| @AfterClass |
| public static void teardown() { |
| ElasticsearchTestUtils.clearIndices(client, index); |
| } |
| |
| |
| |
| @Test |
| public void test() throws Exception { |
| List<Map<String, Object>> inputData = new ArrayList<>(); |
| for(int i = 0; i < 10;++i) { |
| final String name = "message" + i; |
| inputData.add( |
| new HashMap<String, Object>() {{ |
| put("source:type", SENSOR_NAME); |
| put("name" , name); |
| put("timestamp", System.currentTimeMillis()); |
| put(Constants.GUID, name); |
| }} |
| ); |
| } |
| ElasticsearchTestUtils.add(client, index, SENSOR_NAME + "_doc" |
| , inputData.stream().map(m -> { |
| try { |
| return JSONUtils.INSTANCE.toJSON(m, true); |
| } catch (JsonProcessingException e) { |
| throw new IllegalStateException(e.getMessage(), e); |
| } |
| }).collect(Collectors.toList())); |
| |
| List<Map<String,Object>> docs = null; |
| for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) { |
| docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc"); |
| if(docs.size() >= 10) { |
| break; |
| } |
| } |
| Assert.assertEquals(10, docs.size()); |
| //modify the first message and add a new field |
| { |
| Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ |
| put("new-field", "metron"); |
| }}; |
| String guid = "" + message0.get(Constants.GUID); |
| dao.replace(new ReplaceRequest(){{ |
| setReplacement(message0); |
| setGuid(guid); |
| setSensorType(SENSOR_NAME); |
| }}, Optional.empty()); |
| |
| Assert.assertEquals(1, table.size()); |
| Document doc = dao.getLatest(guid, SENSOR_NAME); |
| Assert.assertEquals(message0, doc.getDocument()); |
| { |
| //ensure hbase is up to date |
| Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); |
| Result r = table.get(g); |
| NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); |
| Assert.assertEquals(1, columns.size()); |
| Assert.assertEquals(message0 |
| , JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) |
| , JSONUtils.MAP_SUPPLIER) |
| ); |
| } |
| { |
| //ensure ES is up-to-date |
| long cnt = 0; |
| for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { |
| docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc"); |
| cnt = docs |
| .stream() |
| .filter(d -> message0.get("new-field").equals(d.get("new-field"))) |
| .count(); |
| } |
| Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); |
| } |
| } |
| //modify the same message and modify the new field |
| { |
| Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ |
| put("new-field", "metron2"); |
| }}; |
| String guid = "" + message0.get(Constants.GUID); |
| dao.replace(new ReplaceRequest(){{ |
| setReplacement(message0); |
| setGuid(guid); |
| setSensorType(SENSOR_NAME); |
| }}, Optional.empty()); |
| Assert.assertEquals(1, table.size()); |
| Document doc = dao.getLatest(guid, SENSOR_NAME); |
| Assert.assertEquals(message0, doc.getDocument()); |
| { |
| //ensure hbase is up to date |
| Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); |
| Result r = table.get(g); |
| NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); |
| Assert.assertEquals(2, columns.size()); |
| Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) |
| , JSONUtils.MAP_SUPPLIER) |
| ); |
| Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new String(columns.firstEntry().getValue()) |
| , JSONUtils.MAP_SUPPLIER) |
| ); |
| } |
| { |
| //ensure ES is up-to-date |
| long cnt = 0; |
| for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS)) { |
| docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc"); |
| cnt = docs |
| .stream() |
| .filter(d -> message0.get("new-field").equals(d.get("new-field"))) |
| .count(); |
| } |
| |
| Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); |
| } |
| } |
| } |
| |
| |
| |
| } |