blob: 82a16e0816cb294c687e1828b5908526fb252786 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonString;
import javax.json.JsonValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public class TestSiteToSiteStatusReportingTask {
private ReportingContext context;
public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties,
ProcessGroupStatus pgStatus) throws InitializationException, IOException {
final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask();
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getStateManager())
.thenReturn(new MockStateManager(task));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(pgStatus);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
return task;
}
@Test
public void testSerializedForm() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(16, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
JsonString componentId = firstElement.getJsonString("componentId");
assertEquals(pgStatus.getId(), componentId.getString());
JsonNumber terminatedThreads = firstElement.getJsonNumber("terminatedThreadCount");
assertEquals(1, terminatedThreads.longValue());
JsonString versionedFlowState = firstElement.getJsonString("versionedFlowState");
assertEquals("UP_TO_DATE", versionedFlowState.getString());
}
@Test
public void testComponentTypeFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(ProcessGroup|RootProcessGroup)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(1, task.dataSent.size()); // Only root pg and 3 child pgs
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals(pgStatus.getId(), componentId.getString());
}
@Test
public void testConnectionStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonString backpressure = object.getJsonString("isBackPressureEnabled");
JsonString source = object.getJsonString("sourceName");
assertEquals("true", backpressure.getString());
assertEquals("source", source.getString());
JsonString dataSizeThreshold = object.getJsonString("backPressureDataSizeThreshold");
JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold");
assertEquals("1 KB", dataSizeThreshold.getString());
assertEquals(1024, bytesThreshold.intValue());
assertNull(object.get("destinationName"));
}
@Test
public void testConnectionStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue destination = object.get("destinationName");
assertEquals(destination, JsonValue.NULL);
}
@Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(3, task.dataSent.size()); // 3 processors for each of 4 groups
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals("root.1.processor.1", componentId.getString());
}
@Test
public void testComponentNameFilter_nested() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 2, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(10, task.dataSent.size()); // 3 + (3 * 3) + (3 * 3 * 3) = 39, or 10 batches of 4
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
assertEquals("root.1.1.processor.1", componentId.getString());
}
@Test
public void testPortStatus() throws IOException, InitializationException {
ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"false");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonString runStatus = object.getJsonString("runStatus");
assertEquals(RunStatus.Stopped.name(), runStatus.getString());
boolean isTransmitting = object.getBoolean("transmitting");
assertFalse(isTransmitting);
JsonNumber inputBytes = object.getJsonNumber("inputBytes");
assertEquals(5, inputBytes.intValue());
assertNull(object.get("activeThreadCount"));
}
@Test
public void testPortStatusWithNullValues() throws IOException, InitializationException {
ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue activeThreadCount = object.get("activeThreadCount");
assertEquals(activeThreadCount, JsonValue.NULL);
}
@Test
public void testRemoteProcessGroupStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
JsonNumber activeThreadCount = firstElement.getJsonNumber("activeThreadCount");
assertEquals(1L, activeThreadCount.longValue());
JsonString transmissionStatus = firstElement.getJsonString("transmissionStatus");
assertEquals("Transmitting", transmissionStatus.getString());
}
@Test
public void testRemoteProcessGroupStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
JsonValue targetURI = firstElement.get("targetURI");
assertEquals(targetURI, JsonValue.NULL);
}
@Test
public void testProcessorStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonString parentName = object.getJsonString("parentName");
assertTrue(parentName.getString().startsWith("Awesome.1-"));
JsonString parentPath = object.getJsonString("parentPath");
assertTrue(parentPath.getString().startsWith("NiFi Flow / Awesome.1"));
JsonString runStatus = object.getJsonString("runStatus");
assertEquals(RunStatus.Running.name(), runStatus.getString());
JsonNumber inputBytes = object.getJsonNumber("inputBytes");
assertEquals(9, inputBytes.intValue());
JsonObject counterMap = object.getJsonObject("counters");
assertNotNull(counterMap);
assertEquals(10, counterMap.getInt("counter1"));
assertEquals(5, counterMap.getInt("counter2"));
assertNull(object.get("processorType"));
}
@Test
public void testProcessorStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteUtils.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue type = object.get("processorType");
assertEquals(type, JsonValue.NULL);
}
/***********************************
* Test component generator methods
***********************************/
public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
int maxRecursion, int currentDepth) {
Collection<ConnectionStatus> cStatus = new ArrayList<>();
Collection<PortStatus> ipStatus = new ArrayList<>();
Collection<PortStatus> opStatus = new ArrayList<>();
Collection<ProcessorStatus> pStatus = new ArrayList<>();
Collection<RemoteProcessGroupStatus> rpgStatus = new ArrayList<>();
Collection<ProcessGroupStatus> childPgStatus = new ArrayList<>();
if (currentDepth < maxRecursion) {
for(int i = 1; i < 4; i++) {
childPgStatus.add(generateProcessGroupStatus(id + "." + i, namePrefix + "." + i,
maxRecursion, currentDepth + 1));
}
}
for(int i = 1; i < 4; i++) {
pStatus.add(generateProcessorStatus(id + ".processor." + i, namePrefix + ".processor." + i));
}
for(int i = 1; i < 4; i++) {
cStatus.add(generateConnectionStatus(id + ".connection." + i, namePrefix + ".connection." + i));
}
for(int i = 1; i < 4; i++) {
rpgStatus.add(generateRemoteProcessGroupStatus(id + ".rpg." + i, namePrefix + ".rpg." + i));
}
for(int i = 1; i < 4; i++) {
ipStatus.add(generatePortStatus(id + ".ip." + i, namePrefix + ".ip." + i));
}
for(int i = 1; i < 4; i++) {
opStatus.add(generatePortStatus(id + ".op." + i, namePrefix + ".op." + i));
}
ProcessGroupStatus pgStatus = new ProcessGroupStatus();
pgStatus.setId(id);
pgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pgStatus.setInputPortStatus(ipStatus);
pgStatus.setOutputPortStatus(opStatus);
pgStatus.setProcessGroupStatus(childPgStatus);
pgStatus.setRemoteProcessGroupStatus(rpgStatus);
pgStatus.setProcessorStatus(pStatus);
pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE);
pgStatus.setActiveThreadCount(1);
pgStatus.setBytesRead(2L);
pgStatus.setBytesReceived(3l);
pgStatus.setBytesSent(4l);
pgStatus.setBytesTransferred(5l);
pgStatus.setBytesWritten(6l);
pgStatus.setConnectionStatus(cStatus);
pgStatus.setFlowFilesReceived(7);
pgStatus.setFlowFilesSent(8);
pgStatus.setFlowFilesTransferred(9);
pgStatus.setInputContentSize(10l);
pgStatus.setInputCount(11);
pgStatus.setOutputContentSize(12l);
pgStatus.setOutputCount(13);
pgStatus.setQueuedContentSize(14l);
pgStatus.setQueuedCount(15);
pgStatus.setTerminatedThreadCount(1);
return pgStatus;
}
public static PortStatus generatePortStatus(String id, String namePrefix) {
PortStatus pStatus = new PortStatus();
pStatus.setId(id);
pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pStatus.setActiveThreadCount(null);
pStatus.setBytesReceived(1l);
pStatus.setBytesSent(2l);
pStatus.setFlowFilesReceived(3);
pStatus.setFlowFilesSent(4);
pStatus.setInputBytes(5l);
pStatus.setInputCount(6);
pStatus.setOutputBytes(7l);
pStatus.setOutputCount(8);
pStatus.setRunStatus(RunStatus.Stopped);
pStatus.setTransmitting(false);
return pStatus;
}
public static ProcessorStatus generateProcessorStatus(String id, String namePrefix) {
ProcessorStatus pStatus = new ProcessorStatus();
pStatus.setId(id);
pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pStatus.setActiveThreadCount(0);
pStatus.setAverageLineageDuration(1l);
pStatus.setBytesRead(2l);
pStatus.setBytesReceived(3l);
pStatus.setBytesSent(4l);
pStatus.setBytesWritten(5l);
pStatus.setFlowFilesReceived(6);
pStatus.setFlowFilesRemoved(7);
pStatus.setFlowFilesSent(8);
pStatus.setInputBytes(9l);
pStatus.setInputCount(10);
pStatus.setInvocations(11);
pStatus.setOutputBytes(12l);
pStatus.setOutputCount(13);
pStatus.setProcessingNanos(14l);
pStatus.setType(null);
pStatus.setTerminatedThreadCount(1);
pStatus.setRunStatus(RunStatus.Running);
pStatus.setCounters(new HashMap<String, Long>() {{
put("counter1", 10L);
put("counter2", 5L);
}});
return pStatus;
}
public static RemoteProcessGroupStatus generateRemoteProcessGroupStatus(String id, String namePrefix) {
RemoteProcessGroupStatus rpgStatus = new RemoteProcessGroupStatus();
rpgStatus.setId(id);
rpgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
rpgStatus.setActiveRemotePortCount(0);
rpgStatus.setActiveThreadCount(1);
rpgStatus.setAverageLineageDuration(2l);
rpgStatus.setInactiveRemotePortCount(3);
rpgStatus.setReceivedContentSize(4l);
rpgStatus.setReceivedCount(5);
rpgStatus.setSentContentSize(6l);
rpgStatus.setSentCount(7);
rpgStatus.setTargetUri(null);
rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
return rpgStatus;
}
public static ConnectionStatus generateConnectionStatus(String id, String namePrefix) {
ConnectionStatus cStatus = new ConnectionStatus();
cStatus.setId(id);
cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
cStatus.setBackPressureDataSizeThreshold("1 KB"); // sets backPressureBytesThreshold too
cStatus.setBackPressureObjectThreshold(1l);
cStatus.setInputBytes(2l);
cStatus.setInputCount(3);
cStatus.setMaxQueuedBytes(4l);
cStatus.setMaxQueuedCount(5);
cStatus.setOutputBytes(6);
cStatus.setOutputCount(7);
cStatus.setQueuedBytes(8l);
cStatus.setQueuedCount(9);
cStatus.setSourceId(id);
cStatus.setSourceName("source");
cStatus.setDestinationId(id);
cStatus.setDestinationName(null);
return cStatus;
}
public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
MockFlowFile mockFlowFile = new MockFlowFile(id);
mockFlowFile.putAttributes(attributes);
return mockFlowFile;
}
private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask {
public MockSiteToSiteStatusReportingTask() throws IOException {
super();
}
final List<byte[]> dataSent = new ArrayList<>();
@Override
public void setup(PropertyContext reportContext) {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
}
public List<byte[]> getDataSent() {
return dataSent;
}
}
}