blob: 3361a0fc14a22e5f267708f9d685ecddba743e39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
import org.apache.nifi.web.api.dto.ParameterDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VariableDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchValueDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity;
import org.apache.nifi.web.api.entity.ParameterEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class NiFiClientUtil {
private static final Logger logger = LoggerFactory.getLogger(NiFiClientUtil.class);
private final NiFiClient nifiClient;
private final String nifiVersion;
public NiFiClientUtil(final NiFiClient client, final String nifiVersion) {
this.nifiClient = client;
this.nifiVersion = nifiVersion;
}
public ProcessorEntity createProcessor(final String simpleTypeName) throws NiFiClientException, IOException {
return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
public ProcessorEntity createProcessor(final String simpleTypeName, final String groupId) throws NiFiClientException, IOException {
return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
public ProcessorEntity createProcessor(final String type, final String bundleGroupId, final String artifactId, final String version) throws NiFiClientException, IOException {
return createProcessor(type, "root", bundleGroupId, artifactId, version);
}
public ProcessorEntity createProcessor(final String type, final String processGroupId, final String bundleGroupId, final String artifactId, final String version)
throws NiFiClientException, IOException {
final ProcessorDTO dto = new ProcessorDTO();
dto.setType(type);
final BundleDTO bundle = new BundleDTO();
bundle.setGroup(bundleGroupId);
bundle.setArtifact(artifactId);
bundle.setVersion(version);
dto.setBundle(bundle);
final ProcessorEntity entity = new ProcessorEntity();
entity.setComponent(dto);
entity.setRevision(createNewRevision());
return nifiClient.getProcessorClient().createProcessor(processGroupId, entity);
}
public ControllerServiceEntity createControllerService(final String simpleTypeName) throws NiFiClientException, IOException {
return createControllerService(NiFiSystemIT.TEST_CS_PACKAGE + "." + simpleTypeName, "root", NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
public ControllerServiceEntity createControllerService(final String type, final String processGroupId, final String bundleGroupId, final String artifactId, final String version)
throws NiFiClientException, IOException {
final ControllerServiceDTO dto = new ControllerServiceDTO();
dto.setType(type);
final BundleDTO bundle = new BundleDTO();
bundle.setGroup(bundleGroupId);
bundle.setArtifact(artifactId);
bundle.setVersion(version);
dto.setBundle(bundle);
final ControllerServiceEntity entity = new ControllerServiceEntity();
entity.setComponent(dto);
entity.setRevision(createNewRevision());
return nifiClient.getControllerServicesClient().createControllerService(processGroupId, entity);
}
public ParameterEntity createParameterEntity(final String name, final String description, final boolean sensitive, final String value) {
final ParameterDTO dto = new ParameterDTO();
dto.setName(name);
dto.setDescription(description);
dto.setSensitive(sensitive);
dto.setValue(value);
final ParameterEntity entity = new ParameterEntity();
entity.setParameter(dto);
return entity;
}
public ParameterContextEntity createParameterContextEntity(final String name, final String description, final Set<ParameterEntity> parameters) {
final ParameterContextDTO contextDto = new ParameterContextDTO();
contextDto.setName(name);
contextDto.setDescription(description);
contextDto.setParameters(parameters);
final ParameterContextEntity entity = new ParameterContextEntity();
entity.setComponent(contextDto);
entity.setRevision(createNewRevision());
return entity;
}
public RevisionDTO createNewRevision() {
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(getClass().getName());
revisionDto.setVersion(0L);
return revisionDto;
}
private ParameterContextReferenceEntity createReferenceEntity(final String id) {
return createReferenceEntity(id, null);
}
private ParameterContextReferenceEntity createReferenceEntity(final String id, final String name) {
final ParameterContextReferenceDTO referenceDto = new ParameterContextReferenceDTO();
referenceDto.setId(id);
referenceDto.setName(name);
final ParameterContextReferenceEntity referenceEntity = new ParameterContextReferenceEntity();
referenceEntity.setId(id);
referenceEntity.setComponent(referenceDto);
return referenceEntity;
}
public ProcessGroupEntity setParameterContext(final String groupId, final ParameterContextEntity parameterContext) throws NiFiClientException, IOException {
final ProcessGroupEntity processGroup = nifiClient.getProcessGroupClient().getProcessGroup(groupId);
processGroup.getComponent().setParameterContext(createReferenceEntity(parameterContext.getId()));
return nifiClient.getProcessGroupClient().updateProcessGroup(processGroup);
}
public ParameterContextEntity createParameterContext(final String contextName, final String parameterName, final String parameterValue) throws NiFiClientException, IOException {
return createParameterContext(contextName, Collections.singletonMap(parameterName, parameterValue));
}
public ParameterContextEntity createParameterContext(final String contextName, final Map<String, String> parameters) throws NiFiClientException, IOException {
final Set<ParameterEntity> parameterEntities = new HashSet<>();
parameters.forEach((paramName, paramValue) -> parameterEntities.add(createParameterEntity(paramName, null, false, paramValue)));
final ParameterContextEntity contextEntity = createParameterContextEntity(contextName, null, parameterEntities);
final ParameterContextEntity createdContextEntity = nifiClient.getParamContextClient().createParamContext(contextEntity);
return createdContextEntity;
}
public ParameterContextUpdateRequestEntity updateParameterContext(final ParameterContextEntity existingEntity, final String paramName, final String paramValue)
throws NiFiClientException, IOException {
return updateParameterContext(existingEntity, Collections.singletonMap(paramName, paramValue));
}
public ParameterContextUpdateRequestEntity updateParameterContext(final ParameterContextEntity existingEntity, final Map<String, String> parameters) throws NiFiClientException, IOException {
final Set<ParameterEntity> parameterEntities = new HashSet<>();
parameters.forEach((paramName, paramValue) -> parameterEntities.add(createParameterEntity(paramName, null, false, paramValue)));
existingEntity.getComponent().setParameters(parameterEntities);
return nifiClient.getParamContextClient().updateParamContext(existingEntity);
}
public void waitForParameterContextRequestToComplete(final String contextId, final String requestId) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final ParameterContextUpdateRequestEntity entity = nifiClient.getParamContextClient().getParamContextUpdateRequest(contextId, requestId);
if (entity.getRequest().isComplete()) {
if (entity.getRequest().getFailureReason() == null) {
return;
}
throw new RuntimeException("Parameter Context Update failed: " + entity.getRequest().getFailureReason());
}
Thread.sleep(100L);
}
}
public ProcessorEntity updateProcessorExecutionNode(final ProcessorEntity currentEntity, final ExecutionNode executionNode) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setExecutionNode(executionNode.name());
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorProperties(final ProcessorEntity currentEntity, final Map<String, String> properties) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setProperties(properties);
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity currentEntity, final int runDuration) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setRunDurationMillis((long) runDuration);
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorSchedulingPeriod(final ProcessorEntity currentEntity, final String schedulingPeriod) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setSchedulingPeriod(schedulingPeriod);
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorConfig(final ProcessorEntity currentEntity, final ProcessorConfigDTO config) throws NiFiClientException, IOException {
final ProcessorDTO processorDto = new ProcessorDTO();
processorDto.setConfig(config);
processorDto.setId(currentEntity.getId());
final ProcessorEntity updatedEntity = new ProcessorEntity();
updatedEntity.setRevision(currentEntity.getRevision());
updatedEntity.setComponent(processorDto);
updatedEntity.setId(currentEntity.getId());
return nifiClient.getProcessorClient().updateProcessor(updatedEntity);
}
public ProcessorEntity setAutoTerminatedRelationships(final ProcessorEntity currentEntity, final String autoTerminatedRelationship) throws NiFiClientException, IOException {
return setAutoTerminatedRelationships(currentEntity, Collections.singleton(autoTerminatedRelationship));
}
public ProcessorEntity setAutoTerminatedRelationships(final ProcessorEntity currentEntity, final Set<String> autoTerminatedRelationships) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setAutoTerminatedRelationships(autoTerminatedRelationships);
return updateProcessorConfig(currentEntity, config);
}
public void waitForValidProcessor(String id) throws InterruptedException, IOException, NiFiClientException {
waitForValidStatus(id, ProcessorDTO.VALID);
}
public void waitForInvalidProcessor(String id) throws NiFiClientException, IOException, InterruptedException {
waitForValidStatus(id, ProcessorDTO.INVALID);
}
public void waitForValidStatus(final String processorId, final String expectedStatus) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final ProcessorEntity entity = nifiClient.getProcessorClient().getProcessor(processorId);
final String validationStatus = entity.getComponent().getValidationStatus();
if (expectedStatus.equals(validationStatus)) {
return;
}
if ("Invalid".equalsIgnoreCase(validationStatus)) {
logger.info("Processor with ID {} is currently invalid due to: {}", processorId, entity.getComponent().getValidationErrors());
}
Thread.sleep(100L);
}
}
public void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
waitForProcessorState(processorId, "RUNNING");
}
public void waitForStoppedProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
waitForProcessorState(processorId, "STOPPED");
}
public void waitForProcessorState(final String processorId, final String expectedState) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final ProcessorEntity entity = nifiClient.getProcessorClient().getProcessor(processorId);
final String state = entity.getComponent().getState();
if (!expectedState.equals(state)) {
Thread.sleep(10L);
continue;
}
if ("RUNNING".equals(expectedState)) {
return;
}
if (entity.getStatus().getAggregateSnapshot().getActiveThreadCount() == 0) {
return;
}
Thread.sleep(10L);
}
}
public ControllerServiceEntity enableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException {
final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity();
runStatusEntity.setState("ENABLED");
runStatusEntity.setRevision(entity.getRevision());
return nifiClient.getControllerServicesClient().activateControllerService(entity.getId(), runStatusEntity);
}
public ControllerServiceEntity disableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException {
final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity();
runStatusEntity.setState("DISABLED");
runStatusEntity.setRevision(entity.getRevision());
return nifiClient.getControllerServicesClient().activateControllerService(entity.getId(), runStatusEntity);
}
public Map<String, Long> waitForCounter(final String context, final String counterName, final long expectedValue) throws NiFiClientException, IOException, InterruptedException {
Map<String, Long> counterValues = getCountersAsMap(context);
while (true) {
final Long counterValue = counterValues.get(counterName);
if (counterValue != null && counterValue.longValue() == expectedValue) {
return counterValues;
}
Thread.sleep(10L);
counterValues = getCountersAsMap(context);
}
}
public Map<String, Long> getCountersAsMap(final String processorId) throws NiFiClientException, IOException {
final CountersEntity firstCountersEntity = nifiClient.getCountersClient().getCounters();
final CountersSnapshotDTO firstCounters = firstCountersEntity.getCounters().getAggregateSnapshot();
final Map<String, Long> counterValues = firstCounters.getCounters().stream()
.filter(dto -> dto.getContext().contains(processorId))
.collect(Collectors.toMap(CounterDTO::getName, CounterDTO::getValueCount));
return counterValues;
}
public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
scheduleComponentsEntity.setId(groupId);
scheduleComponentsEntity.setState("RUNNING");
final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents(groupId, scheduleComponentsEntity);
return scheduleEntity;
}
public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
scheduleComponentsEntity.setId(groupId);
scheduleComponentsEntity.setState("STOPPED");
final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents(groupId, scheduleComponentsEntity);
waitForProcessorsStopped(groupId);
return scheduleEntity;
}
private void waitForProcessorsStopped(final String groupId) throws IOException, NiFiClientException {
final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(groupId);
final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow();
for (final ProcessorEntity processor : rootFlowDTO.getProcessors()) {
try {
waitForStoppedProcessor(processor.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new NiFiClientException("Interrupted while waiting for Processor with ID " + processor.getId() + " to stop");
}
}
for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
waitForProcessorsStopped(group.getComponent());
}
}
private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException {
final FlowSnippetDTO groupContents = group.getContents();
if (groupContents == null) {
return;
}
for (final ProcessorDTO processor : groupContents.getProcessors()) {
try {
waitForStoppedProcessor(processor.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new NiFiClientException("Interrupted while waiting for Processor with ID " + processor.getId() + " to stop");
}
}
for (final ProcessGroupDTO child : groupContents.getProcessGroups()) {
waitForProcessorsStopped(child);
}
}
public void stopTransmitting(final String processGroupId) throws NiFiClientException, IOException {
final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(processGroupId);
final FlowDTO flowDto = rootGroup.getProcessGroupFlow().getFlow();
for (final RemoteProcessGroupEntity rpg : flowDto.getRemoteProcessGroups()) {
nifiClient.getRemoteProcessGroupClient().stopTransmitting(rpg);
}
for (final ProcessGroupEntity childGroup : flowDto.getProcessGroups()) {
stopTransmitting(childGroup.getId());
}
}
public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException {
final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
activateControllerServicesEntity.setId(groupId);
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED);
final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
waitForControllerSerivcesDisabled(groupId);
if (recurse) {
final ProcessGroupFlowEntity groupEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
final FlowDTO flowDto = groupEntity.getProcessGroupFlow().getFlow();
for (final ProcessGroupEntity childGroupEntity : flowDto.getProcessGroups()) {
final String childGroupId = childGroupEntity.getId();
disableControllerServices(childGroupId, recurse);
}
}
return activateControllerServices;
}
public void waitForControllerSerivcesDisabled(final String groupId, final String... serviceIdsOfInterest) throws NiFiClientException, IOException {
waitForControllerServiceState(groupId, "DISABLED", Arrays.asList(serviceIdsOfInterest));
}
public void waitForControllerSerivcesEnabled(final String groupId, final String... serviceIdsOfInterest) throws NiFiClientException, IOException {
waitForControllerServiceState(groupId, "ENABLED", Arrays.asList(serviceIdsOfInterest));
}
public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
while (true) {
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
if (nonDisabledServices.isEmpty()) {
System.out.println(String.format("All Controller Services in Process Group %s now have desired state of %s", groupId, desiredState));
return;
}
final ControllerServiceEntity entity = nonDisabledServices.get(0);
System.out.println(String.format("Controller Service with ID %s and type %s has a State of %s while waiting for state of %s; will wait 500 millis and check again", entity.getId(),
entity.getComponent().getType(), entity.getComponent().getState(), desiredState));
try {
Thread.sleep(500L);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
}
}
public List<ControllerServiceEntity> getControllerServicesNotInState(final String groupId, final String desiredState, final Collection<String> serviceIds) throws NiFiClientException, IOException {
final ControllerServicesEntity servicesEntity = nifiClient.getFlowClient().getControllerServices(groupId);
return servicesEntity.getControllerServices().stream()
.filter(svc -> serviceIds == null || serviceIds.isEmpty() || serviceIds.contains(svc.getId()))
.filter(svc -> !desiredState.equals(svc.getStatus().getRunStatus()))
.collect(Collectors.toList());
}
public void deleteAll(final String groupId) throws NiFiClientException, IOException {
final ProcessGroupFlowEntity rootFlowEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
final ProcessGroupFlowDTO groupFlowDto = rootFlowEntity.getProcessGroupFlow();
final FlowDTO flowDto = groupFlowDto.getFlow();
// Delete all connections
for (final ConnectionEntity connectionEntity : flowDto.getConnections()) {
final ConnectionStatusEntity status = nifiClient.getFlowClient().getConnectionStatus(connectionEntity.getId(), false);
if (status.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() > 0) {
emptyQueue(connectionEntity.getId());
}
nifiClient.getConnectionClient().deleteConnection(connectionEntity);
}
// Delete all processors
for (final ProcessorEntity processorEntity : flowDto.getProcessors()) {
nifiClient.getProcessorClient().deleteProcessor(processorEntity);
}
// Delete all Controller Services
final ControllerServicesEntity servicesEntity = nifiClient.getFlowClient().getControllerServices(groupId);
for (final ControllerServiceEntity serviceEntity : servicesEntity.getControllerServices()) {
nifiClient.getControllerServicesClient().deleteControllerService(serviceEntity);
}
// Delete all RPG's
for (final RemoteProcessGroupEntity rpgEntity : flowDto.getRemoteProcessGroups()) {
nifiClient.getRemoteProcessGroupClient().deleteRemoteProcessGroup(rpgEntity);
}
// Delete all Input Ports
for (final PortEntity port : flowDto.getInputPorts()) {
nifiClient.getInputPortClient().deleteInputPort(port);
}
// Delete all Output Ports
for (final PortEntity port : flowDto.getOutputPorts()) {
nifiClient.getOutputPortClient().deleteOutputPort(port);
}
// Recurse
for (final ProcessGroupEntity childGroupEntity : flowDto.getProcessGroups()) {
deleteAll(childGroupEntity.getId());
}
}
public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination) throws NiFiClientException, IOException {
if ("OUTPUT_PORT".equals(source.getPortType()) && "INPUT_PORT".equals(destination.getPortType())) {
throw new IllegalArgumentException("In order to connect an Output Port to an Input Port, the Process Group ID must be specified");
}
return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
}
public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination, final String connectionGroupId) throws NiFiClientException, IOException {
final ConnectableDTO sourceDto = createConnectableDTO(source);
final ConnectableDTO destinationDto = createConnectableDTO(destination);
return createConnection(sourceDto, destinationDto, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()), connectionGroupId);
}
public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
}
public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final String relationship) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(relationship));
}
public ConnectionEntity createConnection(final ProcessorEntity source, final ProcessorEntity destination, final String relationship) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(relationship));
}
public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final String relationship) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(relationship));
}
public ConnectionEntity createConnection(final ProcessorEntity source, final ProcessorEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final Set<String> relationships) throws NiFiClientException, IOException {
final String connectionGroupId = "OUTPUT_PORT".equals(source.getType()) ? destination.getGroupId() : source.getGroupId();
return createConnection(source, destination, relationships, connectionGroupId);
}
public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final Set<String> relationships, final String connectionGroupId)
throws NiFiClientException, IOException {
final ConnectionDTO connectionDto = new ConnectionDTO();
connectionDto.setSelectedRelationships(relationships);
connectionDto.setDestination(destination);
connectionDto.setSource(source);
connectionDto.setParentGroupId(connectionGroupId);
final ConnectionEntity connectionEntity = new ConnectionEntity();
connectionEntity.setComponent(connectionDto);
connectionEntity.setDestinationGroupId(destination.getGroupId());
connectionEntity.setDestinationId(destination.getId());
connectionEntity.setDestinationType(destination.getType());
connectionEntity.setSourceGroupId(source.getGroupId());
connectionEntity.setSourceId(source.getId());
connectionEntity.setSourceType(source.getType());
connectionEntity.setRevision(createNewRevision());
return nifiClient.getConnectionClient().createConnection(connectionGroupId, connectionEntity);
}
public ConnectableDTO createConnectableDTO(final ProcessorEntity processor) {
final ConnectableDTO dto = new ConnectableDTO();
dto.setGroupId(processor.getComponent().getParentGroupId());
dto.setId(processor.getId());
dto.setName(processor.getComponent().getName());
dto.setRunning("RUNNING".equalsIgnoreCase(processor.getComponent().getState()));
dto.setType("PROCESSOR");
return dto;
}
public ConnectableDTO createConnectableDTO(final PortEntity port) {
final ConnectableDTO dto = new ConnectableDTO();
dto.setGroupId(port.getComponent().getParentGroupId());
dto.setId(port.getId());
dto.setName(port.getComponent().getName());
dto.setRunning("RUNNING".equalsIgnoreCase(port.getComponent().getState()));
dto.setType(port.getPortType());
return dto;
}
public QueueSize getQueueSize(String connectionId) {
try {
final ConnectionStatusEntity statusEntity = nifiClient.getFlowClient().getConnectionStatus(connectionId, false);
final ConnectionStatusSnapshotDTO snapshotDto = statusEntity.getConnectionStatus().getAggregateSnapshot();
return new QueueSize(snapshotDto.getFlowFilesQueued(), snapshotDto.getBytesQueued());
} catch (Exception e) {
throw new RuntimeException("Failed to obtain queue size for connection with ID " + connectionId, e);
}
}
public ConnectionEntity updateConnectionLoadBalancing(final ConnectionEntity connectionEntity, final LoadBalanceStrategy strategy, final LoadBalanceCompression compression,
final String loadBalanceAttribute) throws NiFiClientException, IOException {
final ConnectionDTO connectionDto = new ConnectionDTO();
connectionDto.setLoadBalancePartitionAttribute(loadBalanceAttribute);
connectionDto.setLoadBalanceStrategy(strategy.name());
connectionDto.setLoadBalanceCompression(compression.name());
connectionDto.setId(connectionEntity.getId());
final ConnectionEntity updatedEntity = new ConnectionEntity();
updatedEntity.setComponent(connectionDto);
updatedEntity.setId(connectionEntity.getId());
updatedEntity.setRevision(connectionEntity.getRevision());
return nifiClient.getConnectionClient().updateConnection(updatedEntity);
}
public DropRequestEntity emptyQueue(final String connectionId) throws NiFiClientException, IOException {
final ConnectionClient connectionClient = nifiClient.getConnectionClient();
DropRequestEntity requestEntity = connectionClient.emptyQueue(connectionId);
try {
while (requestEntity.getDropRequest().getPercentCompleted() < 100) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return null;
}
requestEntity = connectionClient.getDropRequest(connectionId, requestEntity.getDropRequest().getId());
}
} finally {
requestEntity = connectionClient.deleteDropRequest(connectionId, requestEntity.getDropRequest().getId());
}
return requestEntity;
}
public RemoteProcessGroupEntity createRPG(final String parentGroupId, final SiteToSiteTransportProtocol transportProtocol) throws NiFiClientException, IOException {
final RemoteProcessGroupDTO component = new RemoteProcessGroupDTO();
component.setTargetUri("http://localhost:5671");
component.setName(component.getTargetUri());
component.setTransportProtocol(transportProtocol.name());
final RemoteProcessGroupEntity entity = new RemoteProcessGroupEntity();
entity.setComponent(component);
entity.setRevision(createNewRevision());
return nifiClient.getRemoteProcessGroupClient().createRemoteProcessGroup(parentGroupId, entity);
}
public PortEntity createRemoteInputPort(final String parentGroupId, final String portName) throws NiFiClientException, IOException {
final PortDTO component = new PortDTO();
component.setName(portName);
component.setAllowRemoteAccess(true);
final PortEntity entity = new PortEntity();
entity.setComponent(component);
entity.setRevision(createNewRevision());
return nifiClient.getInputPortClient().createInputPort(parentGroupId, entity);
}
public NodeEntity disconnectNode(final String nodeId) throws NiFiClientException, IOException {
return updateNodeState(nodeId, NodeConnectionState.DISCONNECTING.name());
}
public NodeEntity connectNode(final String nodeId) throws NiFiClientException, IOException {
return updateNodeState(nodeId, NodeConnectionState.CONNECTING.name());
}
public NodeEntity offloadNode(final String nodeId) throws NiFiClientException, IOException {
return updateNodeState(nodeId, NodeConnectionState.OFFLOADING.name());
}
private NodeEntity updateNodeState(final String nodeId, final String state) throws NiFiClientException, IOException {
final NodeDTO nodeDto = new NodeDTO();
nodeDto.setNodeId(nodeId);
nodeDto.setStatus(state);
final NodeEntity nodeEntity = new NodeEntity();
nodeEntity.setNode(nodeDto);
return nifiClient.getControllerClient().disconnectNode(nodeId, nodeEntity);
}
public ListingRequestEntity performQueueListing(final String connectionId) throws NiFiClientException, IOException {
try {
ListingRequestEntity listingRequestEntity = nifiClient.getConnectionClient().listQueue(connectionId);
while (listingRequestEntity.getListingRequest().getFinished() != Boolean.TRUE) {
Thread.sleep(10L);
listingRequestEntity = nifiClient.getConnectionClient().getListingRequest(connectionId, listingRequestEntity.getListingRequest().getId());
}
// Delete the listing. Return the previously obtained listing, not the one from the deletion, because the listing request that is returned from deleting the listing does not contain the
// FlowFile Summaries.
nifiClient.getConnectionClient().deleteListingRequest(connectionId, listingRequestEntity.getListingRequest().getId());
return listingRequestEntity;
} catch (final InterruptedException e) {
Assert.fail("Failed to obtain connection status");
return null;
}
}
public FlowFileEntity getQueueFlowFile(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final ListingRequestEntity listing = performQueueListing(connectionId);
final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
if (flowFileIndex >= flowFileSummaries.size()) {
throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
}
final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
final String uuid = flowFileSummary.getUuid();
final String nodeId = flowFileSummary.getClusterNodeId();
final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
return flowFileEntity;
}
public String getFlowFileContentAsUtf8(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final byte[] contents = getFlowFileContentAsByteArray(connectionId, flowFileIndex);
return new String(contents, StandardCharsets.UTF_8);
}
public byte[] getFlowFileContentAsByteArray(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
try (final InputStream in = getFlowFileContent(connectionId, flowFileIndex);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
StreamUtils.copy(in, baos);
return baos.toByteArray();
}
}
public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final ListingRequestEntity listing = performQueueListing(connectionId);
final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
if (flowFileIndex >= flowFileSummaries.size()) {
throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
}
final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
final String uuid = flowFileSummary.getUuid();
final String nodeId = flowFileSummary.getClusterNodeId();
final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
return nifiClient.getConnectionClient().getFlowFileContent(connectionId, uuid, nodeId);
}
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {
final Set<VariableEntity> variableEntities = new HashSet<>();
for (final Map.Entry<String, String> entry : variables.entrySet()) {
final VariableEntity entity = new VariableEntity();
variableEntities.add(entity);
final VariableDTO dto = new VariableDTO();
dto.setName(entry.getKey());
dto.setValue(entry.getValue());
dto.setProcessGroupId(processGroup.getId());
entity.setVariable(dto);
}
final VariableRegistryDTO variableRegistryDto = new VariableRegistryDTO();
variableRegistryDto.setProcessGroupId(processGroup.getId());
variableRegistryDto.setVariables(variableEntities);
final VariableRegistryEntity registryEntity = new VariableRegistryEntity();
registryEntity.setProcessGroupRevision(processGroup.getRevision());
registryEntity.setVariableRegistry(variableRegistryDto);
VariableRegistryUpdateRequestEntity updateRequestEntity = nifiClient.getProcessGroupClient().updateVariableRegistry(processGroup.getId(), registryEntity);
while (!updateRequestEntity.getRequest().isComplete()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Assert.fail("Interrupted while waiting for variable registry to update");
return null;
}
updateRequestEntity = nifiClient.getProcessGroupClient().getVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId());
}
if (updateRequestEntity.getRequest().getFailureReason() != null) {
Assert.fail("Failed to update Variable Registry due to: " + updateRequestEntity.getRequest().getFailureReason());
}
nifiClient.getProcessGroupClient().deleteVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId());
return updateRequestEntity;
}
public ProcessGroupEntity createProcessGroup(final String name, final String parentGroupId) throws NiFiClientException, IOException {
final ProcessGroupDTO component = new ProcessGroupDTO();
component.setName(name);
component.setParentGroupId(parentGroupId);
final ProcessGroupEntity childGroupEntity = new ProcessGroupEntity();
childGroupEntity.setRevision(createNewRevision());
childGroupEntity.setComponent(component);
final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity);
return childGroup;
}
public PortEntity createInputPort(final String name, final String groupId) throws NiFiClientException, IOException {
final PortDTO component = new PortDTO();
component.setName(name);
component.setParentGroupId(groupId);
final PortEntity inputPortEntity = new PortEntity();
inputPortEntity.setRevision(createNewRevision());
inputPortEntity.setComponent(component);
return nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity);
}
public PortEntity createOutputPort(final String name, final String groupId) throws NiFiClientException, IOException {
final PortDTO component = new PortDTO();
component.setName(name);
component.setParentGroupId(groupId);
final PortEntity outputPortEntity = new PortEntity();
outputPortEntity.setRevision(createNewRevision());
outputPortEntity.setComponent(component);
return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity);
}
public ProvenanceEntity queryProvenance(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {
final Map<String, ProvenanceSearchValueDTO> searchTermsAsStrings = searchTerms.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue));
final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO();
requestDto.setSearchTerms(searchTermsAsStrings);
requestDto.setSummarize(false);
requestDto.setStartDate(startTime == null ? null : new Date(startTime));
requestDto.setEndDate(endTime == null ? null : new Date(endTime));
requestDto.setMaxResults(1000);
final ProvenanceDTO dto = new ProvenanceDTO();
dto.setRequest(requestDto);
dto.setSubmissionTime(new Date());
final ProvenanceEntity entity = new ProvenanceEntity();
entity.setProvenance(dto);
ProvenanceEntity responseEntity = nifiClient.getProvenanceClient().submitProvenanceQuery(entity);
try {
responseEntity = waitForComplete(responseEntity);
} catch (final InterruptedException ie) {
Assert.fail("Interrupted while waiting for Provenance Query to complete");
}
nifiClient.getProvenanceClient().deleteProvenanceQuery(responseEntity.getProvenance().getId());
return responseEntity;
}
public ProvenanceEntity waitForComplete(final ProvenanceEntity entity) throws InterruptedException, NiFiClientException, IOException {
ProvenanceEntity current = entity;
while (current.getProvenance().isFinished() != Boolean.TRUE) {
Thread.sleep(100L);
current = nifiClient.getProvenanceClient().getProvenanceQuery(entity.getProvenance().getId());
}
return current;
}
}