* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.accumulo.core.client.mapreduce.lib.impl;
import static;
import static java.util.Objects.requireNonNull;
import java.util.Properties;
import java.util.Scanner;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
import org.apache.accumulo.core.client.impl.ClientConfConverter;
import org.apache.accumulo.core.client.impl.ClientInfoImpl;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.mapreduce.impl.DelegationTokenStub;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
* @since 1.6.0
public class ConfiguratorBase {
protected static final Logger log = Logger.getLogger(ConfiguratorBase.class);
* Specifies that connection info was configured
* @since 1.6.0
public enum ConnectorInfo {
public enum ClientOpts {
* Configuration keys for general configuration options.
* @since 1.6.0
public enum GeneralOpts {
* Provides a configuration key for a given feature enum, prefixed by the implementingClass
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param e
* the enum used to provide the unique part of the configuration key
* @return the configuration key
* @since 1.6.0
protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) {
return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "."
+ StringUtils.camelize(;
* Provides a configuration key for a given feature enum.
* @param e
* the enum used to provide the unique part of the configuration key
* @return the configuration key
protected static String enumToConfKey(Enum<?> e) {
return e.getDeclaringClass().getSimpleName() + "."
+ StringUtils.camelize(;
public static ClientInfo updateToken( credentials,
ClientInfo info) {
ClientInfo result = info;
if (info.getAuthenticationToken() instanceof KerberosToken) {"Received KerberosToken, attempting to fetch DelegationToken");
try {
AccumuloClient client = Accumulo.newClient().usingClientInfo(info).build();
AuthenticationToken token = client.securityOperations()
.getDelegationToken(new DelegationTokenConfig());
result = Accumulo.newClient().usingClientInfo(info).usingToken(info.getPrincipal(), token)
} catch (Exception e) {
log.warn("Failed to automatically obtain DelegationToken, "
+ "Mappers/Reducers will likely fail to communicate with Accumulo", e);
// DelegationTokens can be passed securely from user to task without serializing insecurely in
// the configuration
if (info.getAuthenticationToken() instanceof DelegationTokenImpl) {
DelegationTokenImpl delegationToken = (DelegationTokenImpl) info.getAuthenticationToken();
// Convert it into a Hadoop Token
AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(),
delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName());
// Add the Hadoop Token to the Job so it gets serialized and passed along.
credentials.addToken(hadoopToken.getService(), hadoopToken);
return result;
public static void setClientInfo(Class<?> implementingClass, Configuration conf,
ClientInfo info) {
setClientProperties(implementingClass, conf, info.getProperties());
conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
public static ClientInfo getClientInfo(Class<?> implementingClass, Configuration conf) {
Properties props = getClientProperties(implementingClass, conf);
return new ClientInfoImpl(props);
public static void setClientPropertiesFile(Class<?> implementingClass, Configuration conf,
String clientPropertiesFile) {
try {
DistributedCacheHelper.addCacheFile(new URI(clientPropertiesFile), conf);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to add client properties file \""
+ clientPropertiesFile + "\" to distributed cache.");
conf.set(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), clientPropertiesFile);
conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
public static void setClientProperties(Class<?> implementingClass, Configuration conf,
Properties props) {
StringWriter writer = new StringWriter();
try {, "client properties");
} catch (IOException e) {
throw new IllegalStateException(e);
conf.set(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS), writer.toString());
public static Properties getClientProperties(Class<?> implementingClass, Configuration conf) {
String propString;
String clientPropsFile = conf
.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), "");
if (!clientPropsFile.isEmpty()) {
try {
URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
Path path = null;
for (URI u : uris) {
if (u.toString().equals(clientPropsFile)) {
path = new Path(u);
FileSystem fs = FileSystem.get(conf);
FSDataInputStream inputStream =;
StringBuilder sb = new StringBuilder();
try (Scanner scanner = new Scanner(inputStream)) {
while (scanner.hasNextLine()) {
sb.append(scanner.nextLine() + "\n");
propString = sb.toString();
} catch (IOException e) {
throw new IllegalStateException(
"Failed to read client properties from distributed cache: " + clientPropsFile);
} else {
propString = conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS), "");
Properties props = new Properties();
if (!propString.isEmpty()) {
try {
props.load(new StringReader(propString));
} catch (IOException e) {
throw new IllegalStateException(e);
return props;
* Sets the connector information needed to communicate with Accumulo in this job.
* <p>
* <b>WARNING:</b> The serialized token is stored in the configuration and shared with all
* MapReduce tasks. It is BASE64 encoded to provide a charset safe conversion to a string, and is
* not intended to be secure.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @param principal
* a valid Accumulo user name
* @param token
* the user's password
* @since 1.6.0
public static void setConnectorInfo(Class<?> implementingClass, Configuration conf,
String principal, AuthenticationToken token) {
checkArgument(principal != null, "principal is null");
checkArgument(token != null, "token is null");
Properties props = getClientProperties(implementingClass, conf);
props.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), principal);
ClientProperty.setAuthenticationToken(props, token);
setClientProperties(implementingClass, conf, props);
conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
* Determines if the connector info has already been set for this instance.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return true if the connector info has already been set, false otherwise
* @since 1.6.0
* @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration conf) {
return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false);
* Gets the user name from the configuration.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return the principal
* @since 1.6.0
* @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
public static String getPrincipal(Class<?> implementingClass, Configuration conf) {
Properties props = getClientProperties(implementingClass, conf);
return props.getProperty(ClientProperty.AUTH_PRINCIPAL.getKey());
* Gets the authenticated token from either the specified token file or directly from the
* configuration, whichever was used when the job was configured.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return the principal's authentication token
* @since 1.6.0
* @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
public static AuthenticationToken getAuthenticationToken(Class<?> implementingClass,
Configuration conf) {
Properties props = getClientProperties(implementingClass, conf);
return ClientProperty.getAuthenticationToken(props);
* Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @param clientConfig
* client configuration for specifying connection timeouts, SSL connection options, etc.
* @since 1.6.0
* @deprecated since 2.0.0; use {@link #setClientInfo(Class, Configuration, ClientInfo)} instead
public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf,
org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
Properties props = getClientProperties(implementingClass, conf);
Properties newProps = ClientConfConverter.toProperties(clientConfig);
for (Object keyObj : newProps.keySet()) {
String propKey = (String) keyObj;
String val = newProps.getProperty(propKey);
props.setProperty(propKey, val);
setClientProperties(implementingClass, conf, props);
* Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the
* configuration.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return an Accumulo instance
* @since 1.6.0
* @deprecated since 2.0.0, replaced by {@link #getClientInfo(Class, Configuration)}
public static org.apache.accumulo.core.client.Instance getInstance(Class<?> implementingClass,
Configuration conf) {
return org.apache.accumulo.core.client.Connector.from(getClient(implementingClass, conf))
* Creates an Accumulo {@link AccumuloClient} based on the configuration
* @param implementingClass
* class whose name will be used as a prefix for the property configuration
* @param conf
* Hadoop configuration object
* @return Accumulo connector
* @since 2.0.0
public static AccumuloClient getClient(Class<?> implementingClass, Configuration conf) {
try {
return Accumulo.newClient().usingClientInfo(getClientInfo(implementingClass, conf)).build();
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IllegalStateException(e);
* Obtain a ClientConfiguration based on the configuration.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return A ClientConfiguration
* @since 1.7.0
* @deprecated since 2.0.0; use {@link #getClientInfo(Class, Configuration)} instead
public static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration(
Class<?> implementingClass, Configuration conf) {
return ClientConfConverter.toClientConf(getClientInfo(implementingClass, conf).getProperties());
* Sets the log level for this job.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @param level
* the logging level
* @since 1.6.0
public static void setLogLevel(Class<?> implementingClass, Configuration conf, Level level) {
checkArgument(level != null, "level is null");
conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt());
* Gets the log level from this configuration.
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @return the log level
* @since 1.6.0
* @see #setLogLevel(Class, Configuration, Level)
public static Level getLogLevel(Class<?> implementingClass, Configuration conf) {
return Level.toLevel(
conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt()));
* Sets the valid visibility count for this job.
* @param conf
* the Hadoop configuration object to configure
* @param visibilityCacheSize
* the LRU cache size
public static void setVisibilityCacheSize(Configuration conf, int visibilityCacheSize) {
conf.setInt(enumToConfKey(GeneralOpts.VISIBILITY_CACHE_SIZE), visibilityCacheSize);
* Gets the valid visibility count for this job.
* @param conf
* the Hadoop configuration object to configure
* @return the valid visibility count
public static int getVisibilityCacheSize(Configuration conf) {
return conf.getInt(enumToConfKey(GeneralOpts.VISIBILITY_CACHE_SIZE),
* Unwraps the provided {@link AuthenticationToken} if it is an instance of
* {@link DelegationTokenStub}, reconstituting it from the provided {@link JobConf}.
* @param job
* The job
* @param token
* The authentication token
public static AuthenticationToken unwrapAuthenticationToken(JobConf job,
AuthenticationToken token) {
if (token instanceof DelegationTokenStub) {
DelegationTokenStub delTokenStub = (DelegationTokenStub) token;
Token<? extends TokenIdentifier> hadoopToken = job.getCredentials()
.getToken(new Text(delTokenStub.getServiceName()));
AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier();
try {
.readFields(new DataInputStream(new ByteArrayInputStream(hadoopToken.getIdentifier())));
return new DelegationTokenImpl(hadoopToken.getPassword(), identifier);
} catch (IOException e) {
throw new RuntimeException("Could not construct DelegationToken from JobConf Credentials",
return token;
* Unwraps the provided {@link AuthenticationToken} if it is an instance of
* {@link DelegationTokenStub}, reconstituting it from the provided {@link JobConf}.
* @param job
* The job
* @param token
* The authentication token
public static AuthenticationToken unwrapAuthenticationToken(JobContext job,
AuthenticationToken token) {
if (token instanceof DelegationTokenStub) {
DelegationTokenStub delTokenStub = (DelegationTokenStub) token;
Token<? extends TokenIdentifier> hadoopToken = job.getCredentials()
.getToken(new Text(delTokenStub.getServiceName()));
AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier();
try {
.readFields(new DataInputStream(new ByteArrayInputStream(hadoopToken.getIdentifier())));
return new DelegationTokenImpl(hadoopToken.getPassword(), identifier);
} catch (IOException e) {
throw new RuntimeException("Could not construct DelegationToken from JobConf Credentials",
return token;