blob: 2255ce2b91e864391c18d271a243af66500543b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.bolt;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.parsers.DefaultParserRunnerResults;
import org.apache.metron.parsers.ParserRunnerImpl;
import org.apache.metron.parsers.ParserRunnerResults;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
import org.apache.metron.writer.AckTuplesPolicy;
import org.apache.storm.Config;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;
public class ParserBoltTest extends BaseBoltTest {
@Mock
private Tuple t1;
@Mock
private ParserRunnerImpl parserRunner;
@Mock
private WriterHandler writerHandler;
@Mock
private MessageGetStrategy messageGetStrategy;
@Mock
private Context stellarContext;
@Mock
private AckTuplesPolicy bulkWriterResponseHandler;
private class MockParserRunner extends ParserRunnerImpl {
private boolean isInvalid = false;
private RawMessage rawMessage;
private List<JSONObject> messages;
public MockParserRunner(HashSet<String> sensorTypes) {
super(sensorTypes);
}
@Override
public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
this.rawMessage = rawMessage;
for(JSONObject message: messages) {
if (!isInvalid) {
parserRunnerResults.addMessage(message);
} else {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton(sensorType))
.addRawMessage(message);
parserRunnerResults.addError(error);
}
}
return parserRunnerResults;
}
protected void setInvalid(boolean isInvalid) {
this.isInvalid = isInvalid;
}
protected void setMessages(List<JSONObject> messages) {
this.messages = messages;
}
protected RawMessage getRawMessage() {
return rawMessage;
}
}
@Test
public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() {
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}).withBatchTimeoutDivisor(5);
assertEquals(5, parserBolt.getBatchTimeoutDivisor());
}
@Test
public void shouldThrowExceptionOnInvalidBatchTimeoutDivisor() {
IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class,
() ->
new ParserBolt(
"zookeeperUrl",
parserRunner,
new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}})
.withBatchTimeoutDivisor(-1));
assertEquals("batchTimeoutDivisor must be positive. Value provided was -1", e.getMessage());
}
@Test
public void shouldGetComponentConfiguration() {
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
ParserConfigurations configurations = new ParserConfigurations();
SensorParserConfig sensorParserConfig = new SensorParserConfig();
sensorParserConfig.setParserConfig(new HashMap<String, Object>() {{
put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
}});
configurations.updateSensorParserConfig("yaf", sensorParserConfig);
return configurations;
}
};
Map<String, Object> componentConfiguration = parserBolt.getComponentConfiguration();
assertEquals(1, componentConfiguration.size());
assertEquals( 14, componentConfiguration.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS));
}
@Test
public void shouldPrepare() {
Map stormConf = mock(Map.class);
SensorParserConfig yafConfig = mock(SensorParserConfig.class);
when(yafConfig.getSensorTopic()).thenReturn("yafTopic");
when(yafConfig.getParserConfig()).thenReturn(new HashMap<String, Object>() {{
put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
}});
ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
protected SensorParserConfig getSensorParserConfig(String sensorType) {
if ("yaf".equals(sensorType)) {
return yafConfig;
}
return null;
}
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
});
doReturn(stellarContext).when(parserBolt).initializeStellar();
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
parserBolt.prepare(stormConf, topologyContext, outputCollector);
verify(parserRunner, times(1)).init(any(Supplier.class), eq(stellarContext));
verify(yafConfig, times(1)).init();
Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap();
assertEquals(1, topicToSensorMap.size());
assertEquals("yaf", topicToSensorMap.get("yafTopic"));
verify(writerHandler).init(eq(stormConf), eq(topologyContext), eq(outputCollector), eq(parserConfigurations), any(AckTuplesPolicy.class), eq(14));
}
@Test
public void shouldThrowExceptionOnMissingConfig() {
Map stormConf = mock(Map.class);
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}});
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
IllegalStateException e = assertThrows(IllegalStateException.class, () -> parserBolt.prepare(stormConf, topologyContext, outputCollector));
assertEquals("Unable to retrieve a parser config for yaf", e.getMessage());
}
@Test
public void executeShouldHandleTickTuple() throws Exception {
when(t1.getSourceComponent()).thenReturn("__system");
when(t1.getSourceStreamId()).thenReturn("__tick");
ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
};
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.execute(t1);
verify(writerHandler, times(1)).flush(parserConfigurations, messageGetStrategy);
verify(outputCollector, times(1)).ack(t1);
}
@Test
public void shouldExecuteOnSuccess() throws Exception {
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
ParserConfigurations parserConfigurations = new ParserConfigurations();
parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
});
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
put("yafTopic", "yaf");
}});
parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
JSONObject message = new JSONObject();
message.put(Constants.GUID, "messageId");
message.put("field", "value");
mockParserRunner.setMessages(Collections.singletonList(message));
RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
{
parserBolt.execute(t1);
assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
verify(bulkWriterResponseHandler).addTupleMessageIds(t1, Collections.singletonList("messageId"));
verify(writerHandler, times(1)).write("yaf", new BulkMessage<>("messageId", message), parserConfigurations);
}
}
@Test
public void shouldExecuteOnSuccessWithMultipleMessages() throws Exception {
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
ParserConfigurations parserConfigurations = new ParserConfigurations();
parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
});
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
put("yafTopic", "yaf");
}});
parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
List<BulkMessage<JSONObject>> messages = new ArrayList<>();
for(int i = 0; i < 5; i++) {
String messageId = String.format("messageId%s", i + 1);
JSONObject message = new JSONObject();
message.put(Constants.GUID, messageId);
message.put("field", String.format("value%s", i + 1));
messages.add(new BulkMessage<>(messageId, message));
}
mockParserRunner.setMessages(messages.stream().map(BulkMessage::getMessage).collect(Collectors.toList()));
RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
{
// Verify the correct message is written and ack is handled
parserBolt.execute(t1);
assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
InOrder inOrder = inOrder(bulkWriterResponseHandler, writerHandler);
inOrder.verify(bulkWriterResponseHandler).addTupleMessageIds(t1, Arrays.asList("messageId1", "messageId2", "messageId3", "messageId4", "messageId5"));
inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(0), parserConfigurations);
inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(1), parserConfigurations);
inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(2), parserConfigurations);
inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(3), parserConfigurations);
inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(4), parserConfigurations);
}
verifyNoMoreInteractions(writerHandler, bulkWriterResponseHandler, outputCollector);
}
@Test
public void shouldExecuteOnError() throws Exception {
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{
add("yaf");
}});
mockParserRunner.setInvalid(true);
ParserConfigurations parserConfigurations = new ParserConfigurations();
parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
};
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
put("yafTopic", "yaf");
}});
JSONObject message = new JSONObject();
message.put("field", "value");
mockParserRunner.setMessages(Collections.singletonList(message));
RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton("yaf"))
.addRawMessage(message);
parserBolt.execute(t1);
assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
verify(outputCollector, times(1)).ack(t1);
}
@Test
public void shouldThrowExceptionOnFailedExecute() {
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
ParserConfigurations parserConfigurations = new ParserConfigurations();
parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
doThrow(new IllegalStateException("parserRunner.execute failed")).when(parserRunner).execute(eq("yaf"), any(), eq(parserConfigurations));
ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
};
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
put("yafTopic", "yaf");
}});
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(new IllegalStateException("parserRunner.execute failed"))
.withSensorType(Collections.singleton("yaf"))
.addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
parserBolt.execute(t1);
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
verify(outputCollector, times(1)).ack(t1);
}
@Test
public void shouldThrowExceptionOnFailedWrite() throws Exception {
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
ParserConfigurations parserConfigurations = new ParserConfigurations();
parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any());
ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
put("yaf", writerHandler);
}}) {
@Override
public ParserConfigurations getConfigurations() {
return parserConfigurations;
}
});
parserBolt.setMessageGetStrategy(messageGetStrategy);
parserBolt.setOutputCollector(outputCollector);
parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
put("yafTopic", "yaf");
}});
parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
JSONObject message = new JSONObject();
message.put(Constants.GUID, "messageId");
message.put("field", "value");
mockParserRunner.setMessages(Collections.singletonList(message));
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(new IllegalStateException("write failed"))
.withSensorType(Collections.singleton("yaf"))
.addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
parserBolt.execute(t1);
verify(bulkWriterResponseHandler, times(1)).addTupleMessageIds(t1, Collections.singletonList("messageId"));
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
verify(outputCollector, times(1)).ack(t1);
}
}