blob: 4870c620a911fa849686206568319405ce9304b0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.writers.integration;
import com.google.common.collect.ImmutableList;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.TestConstants;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.*;
import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest {
/**
* {
* "parserClassName": "org.apache.metron.parsers.csv.CSVParser",
* "writerClassName": "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter",
* "sensorTopic": "dummy",
* "outputTopic": "output",
* "errorTopic": "error",
* "parserConfig": {
* "shew.table": "dummy",
* "shew.cf": "cf",
* "shew.keyColumns": "col2",
* "shew.enrichmentType": "et",
* "shew.hbaseProvider": "org.apache.metron.hbase.mock.MockHBaseTableProvider",
* "columns" : {
* "col1": 0,
* "col2": 1,
* "col3": 2
* }
* }
* }
*/
@Multiline
public static String parserConfigJSON;
@Test
public void test() throws UnableToStartException, IOException {
final String sensorType = "dummy";
// the input messages to parse
final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
add(Bytes.toBytes("col11,col12,col13"));
add(Bytes.toBytes("col21,col22,col23"));
add(Bytes.toBytes("col31,col32,col33"));
}};
// setup external components; kafka, zookeeper
MockHBaseTableProvider.addToCache(sensorType, "cf");
final Properties topologyProperties = new Properties();
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(sensorType, 1));
}});
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
System.out.println("Workspace: " + System.getProperty("user.dir"));
System.out.println("Configs path: ../" + TestConstants.SAMPLE_CONFIG_PATH);
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
.withGlobalConfigsPath("../" + TestConstants.SAMPLE_CONFIG_PATH)
.withParserSensorConfig(sensorType, parserConfig);
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
.withSensorTypes(Collections.singletonList(sensorType))
.withTopologyProperties(topologyProperties)
.withBrokerUrl(kafkaComponent.getBrokerList())
.withOutputTopic(parserConfig.getOutputTopic())
.build();
ComponentRunner runner = new ComponentRunner.Builder()
.withComponent("zk", zkServerComponent)
.withComponent("kafka", kafkaComponent)
.withComponent("config", configUploadComponent)
.withComponent("org/apache/storm", parserTopologyComponent)
.withMillisecondsBetweenAttempts(5000)
.withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"})
.withNumRetries(10)
.build();
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
ProcessorResult<List<LookupKV<EnrichmentKey, EnrichmentValue>>> result =
runner.process(new Processor<List<LookupKV<EnrichmentKey, EnrichmentValue>>>() {
List<LookupKV<EnrichmentKey, EnrichmentValue>> messages = null;
@Override
public ReadinessState process(ComponentRunner runner) {
MockHTable table = (MockHTable) MockHBaseTableProvider.getFromCache(sensorType);
if (table != null && table.size() == inputMessages.size()) {
EnrichmentConverter converter = new EnrichmentConverter();
messages = new ArrayList<>();
try {
for (Result r : table.getScanner(Bytes.toBytes("cf"))) {
messages.add(converter.fromResult(r, "cf"));
}
} catch (IOException e) {
}
return ReadinessState.READY;
}
return ReadinessState.NOT_READY;
}
@Override
public ProcessorResult<List<LookupKV<EnrichmentKey, EnrichmentValue>>> getResult() {
ProcessorResult.Builder<List<LookupKV<EnrichmentKey,EnrichmentValue>>> builder = new ProcessorResult.Builder();
return builder.withResult(messages).build();
}
});
Set<String> validIndicators = new HashSet<>(ImmutableList.of("col12", "col22", "col32"));
Map<String, Map<String, String>> validMetadata = new HashMap<String, Map<String, String>>() {{
put("col12", new HashMap<String, String>() {{
put("col1", "col11");
put("col3", "col13");
}});
put("col22", new HashMap<String, String>() {{
put("col1", "col21");
put("col3", "col23");
}});
put("col32", new HashMap<String, String>() {{
put("col1", "col31");
put("col3", "col33");
}});
}};
for (LookupKV<EnrichmentKey, EnrichmentValue> kv : result.getResult()) {
assertTrue(validIndicators.contains(kv.getKey().indicator));
assertEquals(kv.getValue().getMetadata().get("source.type"), "dummy");
assertNotNull(kv.getValue().getMetadata().get("timestamp"));
assertNotNull(kv.getValue().getMetadata().get("original_string"));
Map<String, String> metadata = validMetadata.get(kv.getKey().indicator);
for (Map.Entry<String, String> x : metadata.entrySet()) {
assertEquals(kv.getValue().getMetadata().get(x.getKey()), x.getValue());
}
assertEquals(metadata.size() + 4, kv.getValue().getMetadata().size());
}
}
finally {
if(runner != null) {
runner.stop();
}
}
}
}