blob: d3d88ee5b0de38f97aa5858d5ade9507da47cc43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
public class TestSiteToSiteProvenanceReportingTask {
private final ReportingContext context = Mockito.mock(ReportingContext.class);
private final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class);
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
return setup(event, properties, 2500);
}
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties, long maxEventId) throws IOException {
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
when(context.getStateManager())
.thenReturn(new MockStateManager(task));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
final AtomicInteger totalEvents = new AtomicInteger(0);
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
@Override
public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable {
final long startId = invocation.getArgument(0, Long.class);
final int maxRecords = invocation.getArgument(1, Integer.class);
final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) {
if (event != null) {
eventsToReturn.add(event);
}
totalEvents.getAndIncrement();
}
return eventsToReturn;
}
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
ProcessGroupStatus pgRoot = new ProcessGroupStatus();
pgRoot.setId("root");
when(eventAccess.getControllerStatus()).thenReturn(pgRoot);
// Add child Process Groups.
// Root -> (A, B -> (B2 -> (B3)))
final ProcessGroupStatus pgA = new ProcessGroupStatus();
pgA.setId("pgA");
final ProcessGroupStatus pgB = new ProcessGroupStatus();
pgB.setId("pgB");
final ProcessGroupStatus pgB2 = new ProcessGroupStatus();
pgB2.setId("pgB2");
final ProcessGroupStatus pgB3 = new ProcessGroupStatus();
pgB3.setId("pgB3");
final Collection<ProcessGroupStatus> childPGs = pgRoot.getProcessGroupStatus();
childPGs.add(pgA);
childPGs.add(pgB);
pgB.getProcessGroupStatus().add(pgB2);
pgB2.getProcessGroupStatus().add(pgB3);
// Add Processors.
final ProcessorStatus prcRoot = new ProcessorStatus();
prcRoot.setId("1234");
pgRoot.getProcessorStatus().add(prcRoot);
final ProcessorStatus prcA = new ProcessorStatus();
prcA.setId("A001");
prcA.setName("Processor in PGA");
pgA.getProcessorStatus().add(prcA);
final ProcessorStatus prcB = new ProcessorStatus();
prcB.setId("B001");
prcB.setName("Processor in PGB");
pgB.getProcessorStatus().add(prcB);
final ProcessorStatus prcB2 = new ProcessorStatus();
prcB2.setId("B201");
prcB2.setName("Processor in PGB2");
pgB2.getProcessorStatus().add(prcB2);
final ProcessorStatus prcB3 = new ProcessorStatus();
prcB3.setId("B301");
prcB3.setName("Processor in PGB3");
pgB3.getProcessorStatus().add(prcB3);
// Add connection status to test Remote Input/Output Ports
final ConnectionStatus b2RemoteInputPort = new ConnectionStatus();
b2RemoteInputPort.setGroupId("pgB2");
b2RemoteInputPort.setSourceId("B201");
b2RemoteInputPort.setDestinationId("riB2");
b2RemoteInputPort.setDestinationName("Remote Input Port name");
pgB2.getConnectionStatus().add(b2RemoteInputPort);
final ConnectionStatus b3RemoteOutputPort = new ConnectionStatus();
b3RemoteOutputPort.setGroupId("pgB3");
b3RemoteOutputPort.setSourceId("roB3");
b3RemoteOutputPort.setSourceName("Remote Output Port name");
b3RemoteOutputPort.setDestinationId("B301");
pgB3.getConnectionStatus().add(b3RemoteOutputPort);
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@Override
public Long answer(final InvocationOnMock invocation) throws Throwable {
return maxEventId;
}
}).when(provenanceRepository).getMaxEventId();
when(context.getEventAccess()).thenReturn(eventAccess);
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
when(initContext.getLogger()).thenReturn(logger);
return task;
}
@Test
public void testSerializedForm() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue details = object.get("details");
JsonObject msgArray = object.getJsonObject("updatedAttributes");
assertNull(details);
assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc"));
assertNull(msgArray.get("emptyVal"));
}
@Test
public void testSerializedFormWithNullValues() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue details = object.get("details");
JsonValue emptyVal = object.getJsonObject("updatedAttributes").get("emptyVal");
assertEquals(JsonValue.NULL,details);
assertEquals(JsonValue.NULL,emptyVal);
}
@Test
public void testFilterComponentIdSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterComponentIdNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "9999");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterComponentName() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_NAME, "Processor in .*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_NAME_EXCLUDE, ".*PGB");
// A001 has name "Processor in PGA" and should be picked
ProvenanceEventRecord event = createProvenanceEventRecord("A001", "dummy");
MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("A001", reportedEvent.get("componentId").asText());
assertEquals("Processor in PGA", reportedEvent.get("componentName").asText());
// B001 has name "Processor in PGB" and should not be picked
event = createProvenanceEventRecord("B001", "dummy");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "proc.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeNoResultExcluded() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "proc.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterEventTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE, notExistingType, DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterEventTypeExcludeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE_EXCLUDE, "RECEIVE, notExistingType, DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterEventTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterMultiFilterNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterMultiFilterSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterMultiFilterExcludeTakesPrecedence() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterProcessGroupId() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "pgB2");
// B201 belongs to ProcessGroup B2, so it should be picked.
ProvenanceEventRecord event = createProvenanceEventRecord("B201", "dummy");
MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("B201", reportedEvent.get("componentId").asText());
assertEquals("Processor in PGB2", reportedEvent.get("componentName").asText());
// B301 belongs to PG B3, whose parent is PGB2, so it should be picked, too.
event = createProvenanceEventRecord("B301", "dummy");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("B301", reportedEvent.get("componentId").asText());
assertEquals("Processor in PGB3", reportedEvent.get("componentName").asText());
// A001 belongs to PG A, whose parent is the root PG, so it should be filtered out.
event = createProvenanceEventRecord("A001", "dummy");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testRemotePorts() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteUtils.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "riB2,roB3");
// riB2 is a Remote Input Port in Process Group B2.
ProvenanceEventRecord event = createProvenanceEventRecord("riB2", "Remote Input Port");
MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("riB2", reportedEvent.get("componentId").asText());
assertEquals("Remote Input Port name", reportedEvent.get("componentName").asText());
assertEquals("pgB2", reportedEvent.get("processGroupId").asText());
// roB3 is a Remote Output Port in Process Group B3.
event = createProvenanceEventRecord("roB3", "Remote Output Port");
task = setup(event, properties, 1);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
assertEquals("roB3", reportedEvent.get("componentId").asText());
assertEquals("Remote Output Port name", reportedEvent.get("componentName").asText());
assertEquals("pgB3", reportedEvent.get("processGroupId").asText());
}
@Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
final long maxEventId = 2500;
// create the mock reporting task and mock state manager
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
final MockSiteToSiteProvenanceReportingTask task = setup(null, properties);
final MockStateManager stateManager = new MockStateManager(task);
// create the state map and set the last id to the same value as maxEventId
final Map<String,String> state = new HashMap<>();
state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
stateManager.setState(state, Scope.LOCAL);
// setup the mock provenance repository to return maxEventId
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@Override
public Long answer(final InvocationOnMock invocation) throws Throwable {
return maxEventId;
}
}).when(provenanceRepository).getMaxEventId();
// setup the mock EventAccess to return the mock provenance repository
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
task.initialize(initContext);
// execute the reporting task and should not produce any data b/c max id same as previous id
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
MockFlowFile mockFlowFile = new MockFlowFile(id);
mockFlowFile.putAttributes(attributes);
return mockFlowFile;
}
private ProvenanceEventRecord createProvenanceEventRecord() {
return createProvenanceEventRecord("1234", "dummy processor");
}
private ProvenanceEventRecord createProvenanceEventRecord(final String componentId, final String componentType) {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
attributes.put("emptyVal",null);
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId(componentId);
builder.setComponentType(componentType);
return builder.build();
}
private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask {
public MockSiteToSiteProvenanceReportingTask() throws IOException {
super();
}
final List<byte[]> dataSent = new ArrayList<>();
@Override
public void setup(PropertyContext reportContext) {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
}
public List<byte[]> getDataSent() {
return dataSent;
}
}
}