blob: 32a4f305c2c5f9e8ab1cc8ed4bedc2659f0ab6d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.accumulo;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.fate.Fate;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.JarUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class to hold common methods across the InputFormat, OutputFormat and StorageHandler.
*/
public class HiveAccumuloHelper {
private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class);
// Constant from Accumulo's AuthenticationTokenIdentifier
public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN");
/**
* Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the
* {@link JobConf}'s credentials.
*
* @param user
* User containing tokens
* @param jobConf
* The configuration for the job
* @throws IOException
* If the correct token is not found or the Token fails to be merged with the
* configuration
*/
public void addTokenFromUserToJobConf(UserGroupInformation user, JobConf jobConf)
throws IOException {
checkNotNull(user, "Provided UGI was null");
checkNotNull(jobConf, "JobConf was null");
// Accumulo token already in Configuration, but the Token isn't in the Job credentials like the
// AccumuloInputFormat expects
Token<?> accumuloToken = getAccumuloToken(user);
// If we didn't find the Token, we can't proceed. Log the tokens for debugging.
if (null == accumuloToken) {
log.error("Could not find accumulo token in user: " + user.getTokens());
throw new IOException("Could not find Accumulo Token in user's tokens");
}
// Add the Hadoop token back to the Job, the configuration still has the necessary
// Accumulo token information.
mergeTokenIntoJobConf(jobConf, accumuloToken);
}
public Token<?> getAccumuloToken(UserGroupInformation user) {
checkNotNull(user, "Provided UGI was null");
Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens();
for (Token<?> token : tokens) {
if (ACCUMULO_SERVICE.equals(token.getKind())) {
return token;
}
}
return null;
}
/**
* Merge the provided <code>Token</code> into the JobConf.
*
* @param jobConf
* JobConf to merge token into
* @param accumuloToken
* The Token
* @throws IOException
* If the merging fails
*/
public void mergeTokenIntoJobConf(JobConf jobConf, Token<?> accumuloToken) throws IOException {
JobConf accumuloJobConf = new JobConf(jobConf);
accumuloJobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken);
// Merge them together.
ShimLoader.getHadoopShims().mergeCredentials(jobConf, accumuloJobConf);
}
/**
* Obtain a DelegationToken from Accumulo.
*
* @param conn
* The Accumulo connector
* @return The DelegationToken instance
* @throws IOException
* If the token cannot be obtained
*/
public AuthenticationToken getDelegationToken(Connector conn) throws IOException {
try {
DelegationTokenConfig config = new DelegationTokenConfig();
return conn.securityOperations().getDelegationToken(config);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Failed to obtain DelegationToken", e);
}
}
public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken token)
throws IOException {
if (!(token instanceof DelegationTokenImpl)) {
throw new IOException("Expected a DelegationTokenImpl but found " +
(token != null ? token.getClass() : "null"));
}
DelegationTokenImpl dt = (DelegationTokenImpl) token;
try {
AuthenticationTokenIdentifier identifier = dt.getIdentifier();
return new Token<AuthenticationTokenIdentifier>(identifier.getBytes(),
dt.getPassword(), identifier.getKind(), dt.getServiceName());
} catch (Exception e) {
throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e);
}
}
/**
* Construct a <code>ClientConfiguration</code> instance.
*
* @param zookeepers
* ZooKeeper hosts
* @param instanceName
* Instance name
* @param useSasl
* Is SASL enabled
* @return A ClientConfiguration instance
*/
public ClientConfiguration getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) {
return ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl);
}
public void updateInputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
AccumuloConnectionParameters cnxnParams) throws IOException {
updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, true);
}
public void updateOutputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
AccumuloConnectionParameters cnxnParams) throws IOException {
updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, false);
}
void updateConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
AccumuloConnectionParameters cnxnParams, boolean isInputFormat) throws IOException {
if (getAccumuloToken(currentUser) != null) {
addTokenFromUserToJobConf(currentUser, jobConf);
} else {
try {
Connector connector = cnxnParams.getConnector();
// If we have Kerberos credentials, we should obtain the delegation token
AuthenticationToken token = getDelegationToken(connector);
// Send the DelegationToken down to the Configuration for Accumulo to use
if (isInputFormat) {
setInputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
} else {
setOutputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
}
// Convert the Accumulo token in a Hadoop token
Token<? extends TokenIdentifier> accumuloToken = getHadoopToken(token);
// Add the Hadoop token to the JobConf
mergeTokenIntoJobConf(jobConf, accumuloToken);
// Make sure the UGI contains the token too for good measure
if (!currentUser.addToken(accumuloToken)) {
throw new IOException("Failed to add Accumulo Token to UGI");
}
try {
addTokenFromUserToJobConf(currentUser, jobConf);
} catch (IOException e) {
throw new IOException("Current user did not contain necessary delegation Tokens " + currentUser, e);
}
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Failed to acquire Accumulo DelegationToken", e);
}
}
}
public boolean hasKerberosCredentials(UserGroupInformation ugi) {
// Allows mocking in testing.
return ugi.getAuthenticationMethod() == AuthenticationMethod.KERBEROS;
}
/**
* Calls {@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)},
* suppressing exceptions due to setting the configuration multiple times.
*/
public void setInputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
throws AccumuloSecurityException {
try {
AccumuloInputFormat.setConnectorInfo(conf, username, token);
} catch (IllegalStateException e) {
// AccumuloInputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
}
}
/**
* Calls {@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* suppressing exceptions due to setting the configuration multiple times.
*/
public void setOutputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
throws AccumuloSecurityException {
try {
AccumuloOutputFormat.setConnectorInfo(conf, username, token);
} catch (IllegalStateException e) {
// AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
}
}
/**
* Calls {@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
* suppressing exceptions due to setting the configuration multiple times.
*/
public void setInputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
boolean isSasl) throws IOException {
try {
ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
AccumuloInputFormat.setZooKeeperInstance(conf, clientConf);
} catch (IllegalStateException ise) {
// AccumuloInputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ zookeepers, ise);
}
}
/**
* Calls {@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
* suppressing exceptions due to setting the configuration multiple times.
*/
public void setOutputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
boolean isSasl) throws IOException {
try {
ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
AccumuloOutputFormat.setZooKeeperInstance(conf, clientConf);
} catch (IllegalStateException ise) {
// AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ zookeepers, ise);
}
}
/**
* Calls {@link AccumuloInputFormat#setMockInstance(JobConf, String)}, suppressing exceptions due
* to setting the configuration multiple times.
*/
public void setInputFormatMockInstance(JobConf conf, String instanceName) {
try {
AccumuloInputFormat.setMockInstance(conf, instanceName);
} catch (IllegalStateException e) {
// AccumuloInputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting mock instance of " + instanceName, e);
}
}
/**
* Calls {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}, suppressing exceptions
* due to setting the configuration multiple times.
*/
public void setOutputFormatMockInstance(JobConf conf, String instanceName) {
try {
AccumuloOutputFormat.setMockInstance(conf, instanceName);
} catch (IllegalStateException e) {
// AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
log.debug("Ignoring exception setting mock instance of " + instanceName, e);
}
}
/**
* Sets all jars requried by Accumulo input/output tasks in the configuration to be dynamically
* loaded when the task is executed.
*/
public void loadDependentJars(Configuration conf) {
@SuppressWarnings("deprecation")
List<Class<?>> classesToLoad = new ArrayList<>(Arrays.asList(Tracer.class, Fate.class, Connector.class, Main.class, ZooKeeper.class, AccumuloStorageHandler.class));
try {
classesToLoad.add(Class.forName("org.apache.htrace.Trace"));
} catch (Exception e) {
log.warn("Failed to load class for HTrace jar, trying to continue", e);
}
try {
JarUtils.addDependencyJars(conf, classesToLoad);
} catch (IOException e) {
log.error("Could not add necessary Accumulo dependencies to classpath", e);
}
}
/**
* Obtains an Accumulo DelegationToken and sets it in the configuration for input and output jobs.
* The Accumulo token is converted into a Hadoop-style token and returned to the caller.
*
* @return A Hadoop-style token which contains the Accumulo DelegationToken
*/
public Token<? extends TokenIdentifier> setConnectorInfoForInputAndOutput(AccumuloConnectionParameters params, Connector conn, Configuration conf) throws Exception {
// Obtain a delegation token from Accumulo
AuthenticationToken token = getDelegationToken(conn);
// Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo
// AuthentiationToken is serialized, not the entire token). configureJobConf may be
// called multiple times with the same JobConf which results in an error from Accumulo
// MapReduce API. Catch the error, log a debug message and just keep going
try {
InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, conf,
params.getAccumuloUserName(), token);
} catch (IllegalStateException e) {
// The implementation balks when this method is invoked multiple times
log.debug("Ignoring IllegalArgumentException about re-setting connector information");
}
try {
OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, conf,
params.getAccumuloUserName(), token);
} catch (IllegalStateException e) {
// The implementation balks when this method is invoked multiple times
log.debug("Ignoring IllegalArgumentException about re-setting connector information");
}
return getHadoopToken(token);
}
}