blob: 64892061d89b0e67a7c91104f4e8bdff50d031b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.UpdateIntegrationTest;
import org.apache.metron.integration.UnableToStartException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
private static final String SENSOR_NAME= "test";
private static String indexDir = "target/elasticsearch_mutation";
private static String dateFormat = "yyyy.MM.dd.HH";
private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
private static ElasticSearchComponent es;
private static final String TABLE_NAME = "modifications";
private static final String CF = "p";
private static MockHTable table;
private static IndexDao hbaseDao;
private static IndexDao elasticsearchDao;
private static AccessConfig accessConfig;
private static Map<String, Object> globalConfig;
@Override
protected String getIndexName() {
return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
}
@BeforeClass
public static void setupBeforeClass() throws UnableToStartException, IOException {
Configuration config = HBaseConfiguration.create();
MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
MockHBaseTableProvider.addToCache(TABLE_NAME, CF);
table = (MockHTable) tableProvider.getTable(config, TABLE_NAME);
globalConfig = new HashMap<>();
globalConfig.put("es.clustername", "metron");
globalConfig.put("es.port", "9200");
globalConfig.put("es.ip", "localhost");
globalConfig.put("es.date.format", dateFormat);
globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
globalConfig.put(HBaseDao.HBASE_CF, CF);
accessConfig = new AccessConfig();
accessConfig.setTableProvider(tableProvider);
accessConfig.setGlobalConfigSupplier(() -> globalConfig);
es = new ElasticSearchComponent.Builder()
.withHttpPort(9211)
.withIndexDir(new File(indexDir))
.withAccessConfig(accessConfig)
.build();
es.start();
}
@Before
public void setup() {
hbaseDao = new HBaseDao();
elasticsearchDao = new ElasticsearchDao();
MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao);
dao.init(accessConfig);
setDao(dao);
}
@After
public void reset() {
es.reset();
table.clear();
}
@AfterClass
public static void teardown() {
es.stop();
}
@Override
protected void addTestData(String indexName, String sensorType,
List<Map<String, Object>> docs) throws Exception {
es.add(index, SENSOR_NAME
, Iterables.transform(docs,
m -> {
try {
return JSONUtils.INSTANCE.toJSON(m, true);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
)
);
}
@Override
protected List<Map<String, Object>> getIndexedTestData(String indexName, String sensorType) throws Exception {
return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
}
@Override
protected MockHTable getMockHTable() {
return table;
}
}