blob: 8564b80530d16bcffe0df0a6035b0d1b9ca49a52 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.writer;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.json.simple.JSONObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.*;
public class BulkWriterComponentTest {
@Mock
private FlushPolicy<JSONObject> flushPolicy;
@Mock
private BulkMessageWriter<JSONObject> bulkMessageWriter;
@Mock
private WriterConfiguration configurations;
private MessageId messageId1 = new MessageId("messageId1");
private MessageId messageId2 = new MessageId("messageId2");
private String sensorType = "testSensor";
private List<MessageId> messageIds;
private List<BulkMessage<JSONObject>> messages;
private JSONObject message1 = new JSONObject();
private JSONObject message2 = new JSONObject();
@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
message1.put("value", "message1");
message2.put("value", "message2");
messageIds = Arrays.asList(messageId1, messageId2);
messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage<>(messageId1, message1));
add(new BulkMessage<>(messageId2, message2));
}};
when(configurations.isEnabled(any())).thenReturn(true);
}
@Test
public void writeShouldProperlyAckTuplesInBatch() throws Exception {
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
BulkWriterResponse response = new BulkWriterResponse();
response.addAllSuccesses(messageIds);
when(bulkMessageWriter.write(sensorType, configurations, messages)).thenReturn(response);
bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
verify(flushPolicy, times(0)).onFlush(any(), any());
reset(flushPolicy);
when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
BulkWriterResponse expectedResponse = new BulkWriterResponse();
expectedResponse.addAllSuccesses(messageIds);
verify(bulkMessageWriter, times(1)).write(sensorType, configurations,
Arrays.asList(new BulkMessage<>(messageId1, message1), new BulkMessage<>(messageId2, message2)));
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
verify(flushPolicy, times(1)).onFlush(sensorType, expectedResponse);
verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
}
@Test
public void writeShouldFlushPreviousMessagesWhenDisabled() throws Exception {
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
BulkMessage<JSONObject> beforeDisabledMessage = messages.get(0);
BulkMessage<JSONObject> afterDisabledMessage = messages.get(1);
BulkWriterResponse beforeDisabledResponse = new BulkWriterResponse();
beforeDisabledResponse.addSuccess(beforeDisabledMessage.getId());
BulkWriterResponse afterDisabledResponse = new BulkWriterResponse();
afterDisabledResponse.addSuccess(afterDisabledMessage.getId());
when(bulkMessageWriter.write(sensorType, configurations, Collections.singletonList(messages.get(0)))).thenReturn(beforeDisabledResponse);
bulkWriterComponent.write(sensorType, beforeDisabledMessage, bulkMessageWriter, configurations);
verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
verify(flushPolicy, times(0)).onFlush(any(), any());
when(configurations.isEnabled(sensorType)).thenReturn(false);
bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
verify(bulkMessageWriter, times(1)).write(sensorType, configurations, Collections.singletonList(messages.get(0)));
verify(flushPolicy, times(1)).onFlush(sensorType, beforeDisabledResponse);
verify(flushPolicy, times(1)).onFlush(sensorType, afterDisabledResponse);
verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
}
@Test
public void writeShouldProperlyHandleWriterErrors() throws Exception {
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
Throwable e = new Exception("test exception");
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, messageIds);
when(bulkMessageWriter.write(sensorType, configurations, messages)).thenReturn(response);
bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
verify(flushPolicy, times(0)).onFlush(any(), any());
reset(flushPolicy);
when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
BulkWriterResponse expectedErrorResponse = new BulkWriterResponse();
expectedErrorResponse.addAllErrors(e, messageIds);
verify(bulkMessageWriter, times(1)).write(sensorType, configurations, messages);
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
verify(flushPolicy, times(1)).onFlush(sensorType, expectedErrorResponse);
verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
}
@Test
public void writeShouldProperlyHandleWriterException() throws Exception {
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
Throwable e = new Exception("test exception");
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, messageIds);
when(bulkMessageWriter.write(sensorType, configurations, messages)).thenThrow(e);
bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
verify(flushPolicy, times(0)).onFlush(any(), any());
reset(flushPolicy);
when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
BulkWriterResponse expectedErrorResponse = new BulkWriterResponse();
expectedErrorResponse.addAllErrors(e, messageIds);
verify(bulkMessageWriter, times(1)).write(sensorType, configurations, messages);
verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
verify(flushPolicy, times(1)).onFlush(sensorType, expectedErrorResponse);
verifyNoMoreInteractions(flushPolicy);
}
@Test
public void flushShouldAckMissingTuples() throws Exception{
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
MessageId successId = new MessageId("successId");
MessageId errorId = new MessageId("errorId");
MessageId missingId = new MessageId("missingId");
JSONObject successMessage = new JSONObject();
successMessage.put("name", "success");
JSONObject errorMessage = new JSONObject();
errorMessage.put("name", "error");
JSONObject missingMessage = new JSONObject();
missingMessage.put("name", "missing");
List<BulkMessage<JSONObject>> allMessages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage<>(successId, successMessage));
add(new BulkMessage<>(errorId, errorMessage));
add(new BulkMessage<>(missingId, missingMessage));
}};
BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
bulkWriterResponse.addSuccess(successId);
Throwable throwable = mock(Throwable.class);
bulkWriterResponse.addError(throwable, errorId);
when(bulkMessageWriter.write(sensorType, configurations, allMessages)).thenReturn(bulkWriterResponse);
bulkWriterComponent.flush(sensorType, bulkMessageWriter, configurations, allMessages);
BulkWriterResponse expectedResponse = new BulkWriterResponse();
expectedResponse.addSuccess(successId);
expectedResponse.addError(throwable, errorId);
expectedResponse.addSuccess(missingId);
verify(flushPolicy, times(1)).onFlush(sensorType, expectedResponse);
verifyNoMoreInteractions(flushPolicy);
}
@Test
public void flushAllShouldFlushAllSensors() {
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
bulkWriterComponent.write("sensor1", messages.get(0), bulkMessageWriter, configurations);
bulkWriterComponent.write("sensor2", messages.get(1), bulkMessageWriter, configurations);
reset(flushPolicy);
bulkWriterComponent.flushAll(bulkMessageWriter, configurations);
verify(flushPolicy, times(1)).shouldFlush("sensor1", configurations, messages.subList(0, 1));
verify(flushPolicy, times(1)).shouldFlush("sensor2", configurations, messages.subList(1, 2));
verifyNoMoreInteractions(flushPolicy);
}
}