blob: 65d5b9dc568caa3b8bdf2015bddf839adc6dd352 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.confluent.schemaregistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"schema", "registry", "confluent", "avro", "kafka"})
@CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema "
+ "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema "
+ "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.")
@DynamicProperty(name = "request.header.*", value = "String literal, may not be empty", description = "Properties that begin with 'request.header.' " +
"are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry")
public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final String REQUEST_HEADER_PREFIX = "request.header.";
static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder()
.name("url")
.displayName("Schema Registry URLs")
.description("A comma-separated list of URLs of the Schema Registry to interact with")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("http://localhost:8081")
.required(true)
.addValidator(new MultipleURLValidator())
.build();
static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("ssl-context")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for interacting with the Confluent Schema Registry")
.identifiesControllerService(SSLContextService.class)
.required(false)
.build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached from the Schema Registry")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
.name("cache-expiration")
.displayName("Cache Expiration")
.description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+ "cached version of a schema will no longer be used, and the service will have to communicate with the "
+ "Schema Registry again in order to obtain the schema.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 hour")
.required(true)
.build();
static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("timeout")
.displayName("Communications Timeout")
.description("Specifies how long to wait to receive data from the Schema Registry before considering the communications a failure")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("30 secs")
.required(true)
.build();
static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
.name("authentication-type")
.displayName("Authentication Type")
.description("HTTP Client Authentication Type for Confluent Schema Registry")
.required(false)
.allowableValues(AuthenticationType.values())
.defaultValue(AuthenticationType.NONE.toString())
.build();
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("username")
.displayName("Username")
.description("Username for authentication to Confluent Schema Registry")
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
.required(false)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString())
.build();
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("password")
.displayName("Password")
.description("Password for authentication to Confluent Schema Registry")
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
.required(false)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString())
.sensitive(true)
.build();
private volatile SchemaRegistryClient client;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA_REGISTRY_URLS);
properties.add(SSL_CONTEXT);
properties.add(TIMEOUT);
properties.add(CACHE_SIZE);
properties.add(CACHE_EXPIRATION);
properties.add(AUTHENTICATION_TYPE);
properties.add(USERNAME);
properties.add(PASSWORD);
return properties;
}
private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(value)
.valid(subject.startsWith(REQUEST_HEADER_PREFIX)
&& subject.length() > REQUEST_HEADER_PREFIX.length())
.explanation("Dynamic property names must be of format 'request.header.*'")
.build();
}
};
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptionName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(REQUEST_HEADER_VALIDATOR)
.build();
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final List<String> baseUrls = getBaseURLs(context);
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContext sslContext;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
if (sslContextService == null) {
sslContext = null;
} else {
sslContext = sslContextService.createContext();
}
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
// generate a map of http headers where the key is the remainder of the property name after
// the request header prefix
final Map<String, String> httpHeaders =
context.getProperties().entrySet()
.stream()
.filter(e -> e.getKey().getName().startsWith(REQUEST_HEADER_PREFIX))
.collect(Collectors.toMap(
map -> map.getKey().getName().substring(REQUEST_HEADER_PREFIX.length()),
Map.Entry::getValue)
);
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis,
sslContext, username, password, getLogger(), httpHeaders);
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();
client = new CachingSchemaRegistryClient(restClient, cacheSize, cacheExpiration);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet();
if (sslContextSet) {
final List<String> baseUrls = getBaseURLs(validationContext);
final List<String> insecure = baseUrls.stream()
.filter(url -> !url.startsWith("https"))
.collect(Collectors.toList());
if (!insecure.isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(SCHEMA_REGISTRY_URLS.getDisplayName())
.input(insecure.get(0))
.valid(false)
.explanation("When SSL Context is configured, all Schema Registry URL's must use HTTPS, not HTTP")
.build());
}
}
final PropertyValue authenticationTypeProperty = validationContext.getProperty(AUTHENTICATION_TYPE);
if (authenticationTypeProperty.isSet()) {
final AuthenticationType authenticationType = AuthenticationType.valueOf(authenticationTypeProperty.getValue());
if (AuthenticationType.BASIC.equals(authenticationType)) {
final String username = validationContext.getProperty(USERNAME).getValue();
if (StringUtils.isBlank(username)) {
results.add(new ValidationResult.Builder()
.subject(USERNAME.getDisplayName())
.valid(false)
.explanation("Username is required for Basic Authentication")
.build());
}
final String password = validationContext.getProperty(PASSWORD).getValue();
if (StringUtils.isBlank(password)) {
results.add(new ValidationResult.Builder()
.subject(PASSWORD.getDisplayName())
.valid(false)
.explanation("Password is required for Basic Authentication")
.build());
}
}
}
return results;
}
private List<String> getBaseURLs(final PropertyContext context) {
final String urls = context.getProperty(SCHEMA_REGISTRY_URLS).evaluateAttributeExpressions().getValue();
final List<String> baseUrls = Stream.of(urls.split(","))
.map(url -> url.trim())
.collect(Collectors.toList());
return baseUrls;
}
private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
final RecordSchema schema;
if (schemaIdentifier.getVersion().isPresent()) {
schema = client.getSchema(schemaName.get(), schemaIdentifier.getVersion().getAsInt());
} else {
schema = client.getSchema(schemaName.get());
}
return schema;
}
private RecordSchema retrieveSchemaById(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}
final RecordSchema schema = client.getSchema((int) schemaId.getAsLong());
return schema;
}
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
if (schemaIdentifier.getName().isPresent()) {
return retrieveSchemaByName(schemaIdentifier);
} else {
return retrieveSchemaById(schemaIdentifier);
}
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
}