blob: 2256f26d9903926907a4aad4f660d25869ec2c07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.writer.bolt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.log4j.Level;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.system.FakeClock;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.metron.writer.BulkWriterComponent;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
/**
* {
* "field": "value",
* "source.type": "test"
* }
*/
@Multiline
private String sampleMessageString;
@Mock
private BulkMessageWriter<JSONObject> bulkMessageWriter;
private BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt;
private JSONObject sampleMessage;
private List<MessageId> messageIdList;
private List<BulkMessage<JSONObject>> messageList;
private List<JSONObject> fullMessageList;
private List<Tuple> tupleList;
@Before
public void parseMessages() throws ParseException {
JSONParser parser = new JSONParser();
fullMessageList = new ArrayList<>();
sampleMessage = (JSONObject) parser.parse(sampleMessageString);
sampleMessage.put(Constants.GUID, "message1");
sampleMessage.put("field", "value1");
fullMessageList.add(((JSONObject) sampleMessage.clone()));
sampleMessage.put(Constants.GUID, "message2");
sampleMessage.put("field", "value2");
fullMessageList.add(((JSONObject) sampleMessage.clone()));
sampleMessage.put(Constants.GUID, "message3");
sampleMessage.put("field", "value3");
fullMessageList.add(((JSONObject) sampleMessage.clone()));
sampleMessage.put(Constants.GUID, "message4");
sampleMessage.put("field", "value4");
fullMessageList.add(((JSONObject) sampleMessage.clone()));
sampleMessage.put(Constants.GUID, "message5");
sampleMessage.put("field", "value5");
fullMessageList.add(((JSONObject) sampleMessage.clone()));
MockitoAnnotations.initMocks(this);
messageIdList = new ArrayList<>();
tupleList = new ArrayList<>();
messageList = new ArrayList<>();
bulkMessageWriterBolt = spy(new BulkMessageWriterBolt<IndexingConfigurations>(
"zookeeperUrl", "INDEXING")
.withBulkMessageWriter(bulkMessageWriter)
.withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
.withMessageGetterField("message"));
for(int i = 0; i < 5; i++) {
String messageId = String.format("message%s", i + 1);
messageIdList.add(new MessageId(messageId));
JSONObject message = fullMessageList.get(i);
Tuple tuple = mock(Tuple.class);
when(tuple.getValueByField("message")).thenReturn(message);
tupleList.add(tuple);
messageList.add(new BulkMessage<>(messageId, message));
}
}
@Test
public void testSourceTypeMissing() throws Exception {
// setup the bolt
BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
"zookeeperUrl", "INDEXING")
.withBulkMessageWriter(bulkMessageWriter)
.withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
.withMessageGetterField("message");
bulkMessageWriterBolt.setCuratorFramework(client);
bulkMessageWriterBolt.setZKCache(cache);
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(BaseEnrichmentBoltTest.sensorType,
new FileInputStream("../" + BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath));
// initialize the bolt
bulkMessageWriterBolt.declareOutputFields(declarer);
Map stormConf = new HashMap();
bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
// create a message with no source type
JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString);
message.remove("source.type");
when(tuple.getValueByField("message")).thenReturn(message);
// the tuple should be handled as an error and ack'd
bulkMessageWriterBolt.execute(tuple);
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
verify(outputCollector, times(1)).ack(tuple);
verify(outputCollector, times(1)).reportError(any(Throwable.class));
Mockito.verifyNoMoreInteractions(outputCollector);
}
@Test
public void testFlushOnBatchSize() throws Exception {
Map stormConf = new HashMap();
bulkMessageWriterBolt.setCuratorFramework(client);
bulkMessageWriterBolt.setZKCache(cache);
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(BaseEnrichmentBoltTest.sensorType,
new FileInputStream("../" + BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath));
{
doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(WriterConfiguration.class));
try {
bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
} catch(RuntimeException e) {}
reset(bulkMessageWriter);
}
{
when(bulkMessageWriter.getName()).thenReturn("hdfs");
bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class));
}
{
for(int i = 0; i < 4; i++) {
bulkMessageWriterBolt.execute(tupleList.get(i));
verify(bulkMessageWriter, times(0)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class), anyList());
}
BulkWriterResponse response = new BulkWriterResponse();
response.addAllSuccesses(messageIdList);
when(bulkMessageWriter.write(eq(BaseEnrichmentBoltTest.sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
bulkMessageWriterBolt.execute(tupleList.get(4));
verify(bulkMessageWriter, times(1)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class), eq(messageList));
tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
reset(outputCollector);
}
{
doThrow(new Exception()).when(bulkMessageWriter).write(eq(BaseEnrichmentBoltTest.sensorType), any(WriterConfiguration.class)
, anyList());
UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL);
for(int i = 0; i < 5; i++) {
bulkMessageWriterBolt.execute(tupleList.get(i));
}
UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
verify(outputCollector, times(1)).reportError(any(Throwable.class));
}
Mockito.verifyNoMoreInteractions(outputCollector);
}
@Test
public void testFlushOnBatchTimeout() throws Exception {
FakeClock clock = new FakeClock();
bulkMessageWriterBolt = bulkMessageWriterBolt.withBatchTimeoutDivisor(3);
bulkMessageWriterBolt.setCuratorFramework(client);
bulkMessageWriterBolt.setZKCache(cache);
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(BaseEnrichmentBoltTest.sensorType,
new FileInputStream("../" + BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath));
{
bulkMessageWriterBolt.declareOutputFields(declarer);
verify(declarer, times(1)).declareStream(eq("error")
, argThat(new FieldsMatcher("message")));
}
{
Map stormConf = new HashMap();
when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class));
}
{
int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
assertEquals(4, batchTimeout);
for(int i = 0; i < 4; i++) {
bulkMessageWriterBolt.execute(tupleList.get(i));
verify(bulkMessageWriter, times(0)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class), any(List.class));
}
clock.elapseSeconds(5);
BulkWriterResponse response = new BulkWriterResponse();
response.addAllSuccesses(messageIdList);
when(bulkMessageWriter.write(eq(BaseEnrichmentBoltTest.sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
bulkMessageWriterBolt.execute(tupleList.get(4));
verify(bulkMessageWriter, times(1)).write(eq(BaseEnrichmentBoltTest.sensorType), any(WriterConfiguration.class), eq(messageList));
tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
}
Mockito.verifyNoMoreInteractions(outputCollector);
}
@Test
public void testFlushOnTickTuple() throws Exception {
FakeClock clock = new FakeClock();
bulkMessageWriterBolt.setCuratorFramework(client);
bulkMessageWriterBolt.setZKCache(cache);
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(BaseEnrichmentBoltTest.sensorType
, new FileInputStream("../" + BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath));
{
bulkMessageWriterBolt.declareOutputFields(declarer);
verify(declarer, times(1)).declareStream(eq("error")
, argThat(new FieldsMatcher("message")));
}
{
Map stormConf = new HashMap();
when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class));
}
{
int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
assertEquals(14, batchTimeout);
for(int i = 0; i < 5; i++) {
bulkMessageWriterBolt.execute(tupleList.get(i));
verify(bulkMessageWriter, times(0)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class), any());
}
Tuple tickTuple = mock(Tuple.class);
when(tickTuple.getValueByField("message")).thenReturn(null);
when(tickTuple.getSourceComponent()).thenReturn("__system"); //mark the tuple as a TickTuple, part 1 of 2
when(tickTuple.getSourceStreamId()).thenReturn("__tick"); //mark the tuple as a TickTuple, part 2 of 2
BulkWriterResponse response = new BulkWriterResponse();
response.addAllSuccesses(messageIdList);
when(bulkMessageWriter.write(eq(BaseEnrichmentBoltTest.sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
clock.advanceToSeconds(2);
bulkMessageWriterBolt.execute(tickTuple);
verify(bulkMessageWriter, times(0)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class)
, eq(messageList));
verify(outputCollector, times(1)).ack(tickTuple); // 1 tick
clock.advanceToSeconds(9);
bulkMessageWriterBolt.execute(tickTuple);
verify(bulkMessageWriter, times(1)).write(eq(BaseEnrichmentBoltTest.sensorType)
, any(WriterConfiguration.class)
, eq(messageList));
assertEquals(5, tupleList.size());
tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
verify(outputCollector, times(2)).ack(tickTuple);
}
Mockito.verifyNoMoreInteractions(outputCollector);
}
/**
* If an invalid message is sent to indexing, the message should be handled as an error
* and the topology should continue processing.
*/
@Test
public void testMessageInvalid() throws Exception {
FakeClock clock = new FakeClock();
// setup the bolt
BulkMessageWriterBolt<IndexingConfigurations> bolt = new BulkMessageWriterBolt<IndexingConfigurations>(
"zookeeperUrl", "INDEXING")
.withBulkMessageWriter(bulkMessageWriter)
.withMessageGetter(MessageGetters.JSON_FROM_POSITION.name())
.withMessageGetterField("message");
bolt.setCuratorFramework(client);
bolt.setZKCache(cache);
bolt.getConfigurations().updateSensorIndexingConfig(BaseEnrichmentBoltTest.sensorType,
new FileInputStream("../" + BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath));
// initialize the bolt
bolt.declareOutputFields(declarer);
Map stormConf = new HashMap();
bolt.prepare(stormConf, topologyContext, outputCollector, clock);
// execute a tuple that contains an invalid message
byte[] invalidJSON = "this is not valid JSON".getBytes(StandardCharsets.UTF_8);
when(tuple.getBinary(0)).thenReturn(invalidJSON);
bolt.execute(tuple);
// the tuple should be handled as an error and ack'd
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
verify(outputCollector, times(1)).ack(tuple);
verify(outputCollector, times(1)).reportError(any(Throwable.class));
Mockito.verifyNoMoreInteractions(outputCollector);
}
@Test
public void testDeclareOutputFields() {
BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
"zookeeperUrl", "INDEXING");
bulkMessageWriterBolt.declareOutputFields(declarer);
verify(declarer, times(1)).declareStream(eq("error")
, argThat(new FieldsMatcher("message")));
}
}