blob: 333adf8e15041b9fb9fbe724489cfa18d80db403 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.schema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema<ConfigSchema> {
public static final int CONFIG_VERSION = 2;
public static final String VERSION = "MiNiFi Config Version";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS = "Found the following duplicate input port ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS = "Found the following duplicate output port ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in more than one Processor(s), Input Port(s), Output Port(s) and/or Remote Input Port(s): ";
public static final String CONNECTION_WITH_ID = "Connection with id ";
public static final String HAS_INVALID_SOURCE_ID = " has invalid source id ";
public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
public static String TOP_LEVEL_NAME = "top level";
private FlowControllerSchema flowControllerProperties;
private CorePropertiesSchema coreProperties;
private FlowFileRepositorySchema flowfileRepositoryProperties;
private ContentRepositorySchema contentRepositoryProperties;
private ComponentStatusRepositorySchema componentStatusRepositoryProperties;
private SecurityPropertiesSchema securityProperties;
private ProcessGroupSchema processGroupSchema;
private ProvenanceReportingSchema provenanceReportingProperties;
private ProvenanceRepositorySchema provenanceRepositorySchema;
public ConfigSchema(Map map) {
this(map, Collections.emptyList());
}
public ConfigSchema(Map map, List<String> validationIssues) {
validationIssues.stream().forEach(this::addValidationIssue);
flowControllerProperties = getMapAsType(map, FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true);
coreProperties = getMapAsType(map, CORE_PROPS_KEY, CorePropertiesSchema.class, TOP_LEVEL_NAME, false);
flowfileRepositoryProperties = getMapAsType(map, FLOWFILE_REPO_KEY, FlowFileRepositorySchema.class, TOP_LEVEL_NAME, false);
contentRepositoryProperties = getMapAsType(map, CONTENT_REPO_KEY, ContentRepositorySchema.class, TOP_LEVEL_NAME, false);
provenanceRepositorySchema = getMapAsType(map, PROVENANCE_REPO_KEY, ProvenanceRepositorySchema.class, TOP_LEVEL_NAME, false);
componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false);
securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false);
processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME);
provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
addIssuesIfNotNull(flowControllerProperties);
addIssuesIfNotNull(coreProperties);
addIssuesIfNotNull(flowfileRepositoryProperties);
addIssuesIfNotNull(contentRepositoryProperties);
addIssuesIfNotNull(componentStatusRepositoryProperties);
addIssuesIfNotNull(securityProperties);
addIssuesIfNotNull(processGroupSchema);
addIssuesIfNotNull(provenanceReportingProperties);
addIssuesIfNotNull(provenanceRepositorySchema);
List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema);
List<ConnectionSchema> allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList());
List<RemoteProcessingGroupSchema> allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList());
List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList());
List<String> allFunnelIds = allProcessGroups.stream().flatMap(p -> p.getFunnels().stream()).map(FunnelSchema::getId).collect(Collectors.toList());
List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList());
List<String> allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList());
List<String> allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null)
.flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList());
List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS, allFunnelIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
// Potential connection sources and destinations need to have unique ids
OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds),
new HashSet<>(allFunnelIds));
if (overlapResults.duplicates.size() > 0) {
addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
}
allConnectionSchemas.forEach(c -> {
String destinationId = c.getDestinationId();
if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.seen.contains(destinationId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId);
}
String sourceId = c.getSourceId();
if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.seen.contains(sourceId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId);
}
});
}
protected static <T> OverlapResults<T> findOverlap(Collection<T>... collections) {
Set<T> seen = new HashSet<>();
return new OverlapResults<>(seen, Arrays.stream(collections).flatMap(c -> c.stream()).sequential().filter(s -> !seen.add(s)).collect(Collectors.toSet()));
}
public static List<ProcessGroupSchema> getAllProcessGroups(ProcessGroupSchema processGroupSchema) {
List<ProcessGroupSchema> result = new ArrayList<>();
addProcessGroups(processGroupSchema, result);
return result;
}
private static void addProcessGroups(ProcessGroupSchema processGroupSchema, List<ProcessGroupSchema> result) {
result.add(processGroupSchema);
processGroupSchema.getProcessGroupSchemas().forEach(p -> addProcessGroups(p, result));
}
public Map<String, Object> toMap() {
Map<String, Object> result = mapSupplier.get();
result.put(VERSION, getVersion());
putIfNotNull(result, FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties);
putIfNotNull(result, CORE_PROPS_KEY, coreProperties);
putIfNotNull(result, FLOWFILE_REPO_KEY, flowfileRepositoryProperties);
putIfNotNull(result, CONTENT_REPO_KEY, contentRepositoryProperties);
putIfNotNull(result, PROVENANCE_REPO_KEY, provenanceRepositorySchema);
putIfNotNull(result, COMPONENT_STATUS_REPO_KEY, componentStatusRepositoryProperties);
putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
result.putAll(processGroupSchema.toMap());
putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
return result;
}
public FlowControllerSchema getFlowControllerProperties() {
return flowControllerProperties;
}
public CorePropertiesSchema getCoreProperties() {
return coreProperties;
}
public FlowFileRepositorySchema getFlowfileRepositoryProperties() {
return flowfileRepositoryProperties;
}
public ContentRepositorySchema getContentRepositoryProperties() {
return contentRepositoryProperties;
}
public SecurityPropertiesSchema getSecurityProperties() {
return securityProperties;
}
public ProcessGroupSchema getProcessGroupSchema() {
return processGroupSchema;
}
public ProvenanceReportingSchema getProvenanceReportingProperties() {
return provenanceReportingProperties;
}
public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
return componentStatusRepositoryProperties;
}
public ProvenanceRepositorySchema getProvenanceRepositorySchema() {
return provenanceRepositorySchema;
}
@Override
public int getVersion() {
return CONFIG_VERSION;
}
@Override
public ConfigSchema convert() {
return this;
}
private static class OverlapResults<T> {
private final Set<T> seen;
private final Set<T> duplicates;
private OverlapResults(Set<T> seen, Set<T> duplicates) {
this.seen = seen;
this.duplicates = duplicates;
}
}
}