blob: b2b5d3dfe696f11a318f65ad76edb857306cc567 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Iterables;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.UpdateIntegrationTest;
import org.apache.metron.integration.UnableToStartException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Response;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import static org.hamcrest.MatcherAssert.assertThat;
public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
private static final String SENSOR_NAME= "test";
private static String indexDir = "target/elasticsearch_mutation";
private static String dateFormat = "yyyy.MM.dd.HH";
private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
private static ElasticSearchComponent es;
private static final String TABLE_NAME = "modifications";
private static final String CF = "p";
private static MockHTable table;
private static IndexDao elasticsearchDao;
private static AccessConfig accessConfig;
private static Map<String, Object> globalConfig;
/**
* {
* "template": "test*",
* "mappings": {
* "test_doc": {
* "properties": {
* "guid": {
* "type": "keyword"
* }
* }
* }
* }
* }
*/
@Multiline
private static String indexTemplate;
@Override
protected String getIndexName() {
return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
}
@BeforeAll
public static void setupBeforeClass() throws UnableToStartException, IOException {
Configuration config = HBaseConfiguration.create();
MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
MockHBaseTableProvider.addToCache(TABLE_NAME, CF);
table = (MockHTable) tableProvider.getTable(config, TABLE_NAME);
globalConfig = new HashMap<>();
globalConfig.put("es.clustername", "metron");
globalConfig.put("es.port", "9200");
globalConfig.put("es.ip", "localhost");
globalConfig.put("es.date.format", dateFormat);
globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
globalConfig.put(HBaseDao.HBASE_CF, CF);
accessConfig = new AccessConfig();
accessConfig.setTableProvider(tableProvider);
accessConfig.setGlobalConfigSupplier(() -> globalConfig);
es = new ElasticSearchComponent.Builder()
.withHttpPort(9211)
.withIndexDir(new File(indexDir))
.withAccessConfig(accessConfig)
.build();
es.start();
installIndexTemplate();
}
@BeforeEach
public void setup() {
elasticsearchDao = new ElasticsearchDao()
.withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
elasticsearchDao.init(accessConfig);
setDao(elasticsearchDao);
}
@AfterEach
public void reset() {
es.reset();
table.clear();
}
@AfterAll
public static void teardown() {
es.stop();
}
@Override
protected void addTestData(String indexName, String sensorType,
List<Map<String, Object>> docs) throws Exception {
es.add(index, SENSOR_NAME
, Iterables.transform(docs,
m -> {
try {
return JSONUtils.INSTANCE.toJSON(m, true);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
)
);
}
@Override
protected List<Map<String, Object>> getIndexedTestData(String indexName, String sensorType) throws Exception {
return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
}
/**
* Install the index template to ensure that "guid" is of type "keyword". The
* {@link org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao} cannot find
* documents if the type is not "keyword".
*
* See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html
*/
private static void installIndexTemplate() throws IOException {
HttpEntity broEntity = new NStringEntity(indexTemplate, ContentType.APPLICATION_JSON);
ElasticsearchClient client = ElasticsearchClientFactory.create(globalConfig);
Response response = client
.getLowLevelClient()
.performRequest("PUT", "/_template/test_template", Collections.emptyMap(), broEntity);
assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200));
}
}