blob: bd9f7856e4d7ff1a97a2df42e3de78a152b93127 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.accumulo.controllerservices;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
*
* Justification: Centralizes the configuration of the connecting accumulo code. This also will be used
* for any kerberos integration.
*/
@RequiresInstanceClassLoading
@Tags({"accumulo", "client", "service"})
@CapabilityDescription("A controller service for accessing an Accumulo Client.")
public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
private enum AuthenticationType {
PASSWORD,
KERBEROS,
NONE
}
protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
.name("ZooKeeper Quorum")
.displayName("ZooKeeper Quorum")
.description("Comma-separated list of ZooKeeper hosts for Accumulo.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
.name("Instance Name")
.displayName("Instance Name")
.description("Instance name of the Accumulo cluster")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
.name("accumulo-authentication-type")
.displayName("Authentication Type")
.description("Authentication Type")
.allowableValues(AuthenticationType.values())
.defaultValue(AuthenticationType.PASSWORD.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
.name("Accumulo User")
.displayName("Accumulo User")
.description("Connecting user for Accumulo")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
.name("Accumulo Password")
.displayName("Accumulo Password")
.description("Connecting user's password")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
.build();
protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for principal + keytab Kerberos authentication")
.identifiesControllerService(KerberosCredentialsService.class)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
.description("Kerberos Principal")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.name("kerberos-password")
.displayName("Kerberos Password")
.description("Kerberos Password")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new PropertyDescriptor.Builder()
.name("accumulo-sasl-qop")
.displayName("Accumulo SASL quality of protection")
.description("Accumulo SASL quality of protection for KERBEROS Authentication type")
.allowableValues("auth", "auth-int", "auth-conf")
.defaultValue("auth-conf")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
.build();
/**
* Reference to the accumulo client.
*/
AccumuloClient client;
/**
* properties
*/
private List<PropertyDescriptor> properties;
private KerberosUser kerberosUser;
private AuthenticationType authType;
@Override
protected void init(ControllerServiceInitializationContext config) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_QUORUM);
props.add(INSTANCE_NAME);
props.add(AUTHENTICATION_TYPE);
props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD);
props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
props.add(ACCUMULO_SASL_QOP);
properties = Collections.unmodifiableList(props);
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
if (!validationContext.getProperty(INSTANCE_NAME).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance name must be supplied").build());
}
if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
}
final AuthenticationType type = validationContext.getProperty(
AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.NONE;
switch(type){
case PASSWORD:
if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
problems.add(
new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied for the Password Authentication type").build());
}
if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
problems.add(
new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
.explanation("Password must be supplied for the Password Authentication type").build());
}
break;
case KERBEROS:
if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
&& !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
} else if (validationContext.getProperty(KERBEROS_PASSWORD).isSet() && !validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal must be supplied when principal + password Kerberos authentication is used").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
} else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
.explanation("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service").build());
} else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos User Service should not be filled out at the same time").build());
} else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password) should not be filled out when Kerberos User Service is used").build());
}
break;
default:
problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non supported Authentication type").build());
}
return problems;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
}
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
final Properties clientConf = new Properties();
clientConf.setProperty("instance.zookeepers", zookeepers);
clientConf.setProperty("instance.name", instanceName);
switch(authType){
case PASSWORD:
final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
final AuthenticationToken token = new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
break;
case KERBEROS:
if (kerberosUserService != null) {
this.kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
this.kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else {
this.kerberosUser = new KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(), context.getProperty(KERBEROS_PASSWORD).getValue());
}
clientConf.setProperty("sasl.enabled", "true");
clientConf.setProperty("sasl.qop", context.getProperty(ACCUMULO_SASL_QOP).getValue());
//Client uses the currently logged in user's security context, so need to login first.
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
this.client = clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new KerberosToken()).build());
break;
default:
throw new InitializationException("Not supported authentication type.");
}
}
@Override
public AccumuloClient getClient() {
return client;
}
@Override
public void renewTgtIfNecessary() {
if (authType.equals(AuthenticationType.KERBEROS)) {
SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser);
}
}
@OnDisabled
public void shutdown() {
if (client != null) {
client.close();
}
}
}