blob: c60c811ac1363932dfd94d82e26f6cc340716e0f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.dependency;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.api.ConnectionFailureException;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatPartition;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
import org.apache.oozie.action.hadoop.LauncherURIHandler;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
import org.apache.oozie.service.HCatAccessorException;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
public class HCatURIHandler implements URIHandler {
private Set<String> supportedSchemes;
private Map<String, DependencyType> dependencyTypes;
private List<Class<?>> classesToShip;
@Override
public void init(Configuration conf) {
dependencyTypes = new HashMap<String, DependencyType>();
supportedSchemes = new HashSet<String>();
String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
+ this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
supportedSchemes.addAll(Arrays.asList(schemes));
classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
}
@Override
public Set<String> getSupportedSchemes() {
return supportedSchemes;
}
@Override
public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
return HCatLauncherURIHandler.class;
}
@Override
public List<Class<?>> getClassesForLauncher() {
return classesToShip;
}
@Override
public DependencyType getDependencyType(URI uri) throws URIHandlerException {
DependencyType depType = DependencyType.PULL;
// Not initializing in constructor as this will be part of oozie.services.ext
// and will be initialized after URIHandlerService
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
if (hcatService != null) {
depType = dependencyTypes.get(uri.getAuthority());
if (depType == null) {
depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
dependencyTypes.put(uri.getAuthority(), depType);
}
}
return depType;
}
@Override
public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
throws URIHandlerException {
HCatURI hcatURI;
try {
hcatURI = new HCatURI(uri);
}
catch (URISyntaxException e) {
throw new URIHandlerException(ErrorCode.E0906, uri, e);
}
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
if (!hcatService.isRegisteredForNotification(hcatURI)) {
HCatClient client = getHCatClient(uri, conf);
try {
String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
if (topic == null) {
return;
}
hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
}
catch (HCatException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
finally {
closeQuietly(client, null, true);
}
}
PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
pdmService.addMissingDependency(hcatURI, actionID);
}
@Override
public boolean unregisterFromNotification(URI uri, String actionID) {
HCatURI hcatURI;
try {
hcatURI = new HCatURI(uri);
}
catch (URISyntaxException e) {
throw new RuntimeException(e); // Unexpected at this point
}
PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
return pdmService.removeMissingDependency(hcatURI, actionID);
}
@Override
public Context getContext(URI uri, Configuration conf, String user, boolean readOnly)
throws URIHandlerException {
HCatContext context = null;
//read operations are allowed for any user in HCat and so accessing as Oozie server itself
//For write operations, perform doAs as user
if (readOnly) {
HCatClient client = getHCatClient(uri, conf);
context = new HCatContext(conf, user, client);
}
else {
HCatClientWithToken client = getHCatClient(uri, conf, user);
context = new HCatContext(conf, user, client);
}
return context;
}
@Override
public boolean exists(URI uri, Context context) throws URIHandlerException {
HCatClient client = ((HCatContext) context).getHCatClient();
return exists(uri, client, false);
}
@Override
public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
HCatClient client = getHCatClient(uri, conf);
return exists(uri, client, true);
}
@Override
public void delete(URI uri, Context context) throws URIHandlerException {
HCatClient client = ((HCatContext) context).getHCatClient();
try {
HCatURI hcatUri = new HCatURI(uri);
if (!hcatUri.getPartitionMap().isEmpty()) {
client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
} else {
client.dropTable(hcatUri.getDb(), hcatUri.getTable(), true);
}
}
catch (URISyntaxException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (HCatException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
}
@Override
public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
HCatClientWithToken client = null;
HCatClient hCatClient = null;
try {
HCatURI hcatUri = new HCatURI(uri);
client = getHCatClient(uri, conf, user);
hCatClient = client.getHCatClient();
if (!hcatUri.getPartitionMap().isEmpty()) {
hCatClient.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
} else {
hCatClient.dropTable(hcatUri.getDb(), hcatUri.getTable(), true);
}
}
catch (URISyntaxException e){
throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (HCatException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
finally {
closeQuietly(hCatClient, client != null ? client.getDelegationToken() : null, true);
}
}
@Override
public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
return uri;
}
@Override
public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException {
return uri;
}
@Override
public void validate(String uri) throws URIHandlerException {
try {
new HCatURI(uri); // will fail if uri syntax is incorrect
}
catch (URISyntaxException e) {
throw new URIHandlerException(ErrorCode.E0906, uri, e);
}
}
@Override
public void destroy() {
}
private HiveConf getHiveConf(URI uri, Configuration conf) throws HCatAccessorException {
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
if (hcatService.getHCatConf() != null) {
conf = hcatService.getHCatConf();
}
HiveConf hiveConf = new HiveConf(conf, this.getClass());
String serverURI = getMetastoreConnectURI(uri);
if (!serverURI.equals("")) {
hiveConf.set("hive.metastore.local", "false");
}
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
return hiveConf;
}
private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException {
HiveConf hiveConf = getHiveConf(uri, conf);
try {
XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ",
UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
return HCatClient.create(hiveConf);
}
catch (HCatException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (IOException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
}
private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user)
throws HCatAccessorException {
final HiveConf hiveConf = getHiveConf(uri, conf);
String delegationToken = null;
try {
// Get UGI to doAs() as the specified user
UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
// Define the label for the Delegation Token for the HCat instance.
hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature");
if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) {
HCatClient tokenClient = null;
try {
// Retrieve Delegation token for HCatalog
tokenClient = HCatClient.create(hiveConf);
delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser()
.getUserName());
// Store Delegation token in the UGI
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationToken);
token.setService(new Text(hiveConf.get("hive.metastore.token.signature")));
ugi.addToken(token);
}
finally {
if (tokenClient != null) {
tokenClient.close();
}
}
}
XLog.getLog(HCatURIHandler.class).info(
"Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
@Override
public HCatClient run() throws Exception {
HCatClient client = HCatClient.create(hiveConf);
return client;
}
});
HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken);
return clientWithToken;
}
catch (IOException e) {
throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
}
catch (Exception e) {
throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
}
}
private String getMetastoreConnectURI(URI uri) throws HCatAccessorException {
String metastoreURI;
// For unit tests
if (uri.getAuthority().equals("unittest-local")) {
metastoreURI = "";
}
else {
// Hardcoding hcat to thrift mapping till support for webhcat(templeton)
// is added
HCatURI hCatURI;
try {
hCatURI = new HCatURI(uri.toString());
metastoreURI = hCatURI.getServerEndPointWithScheme("thrift");
} catch (URISyntaxException e) {
throw new HCatAccessorException(ErrorCode.E0902, e);
}
}
return metastoreURI;
}
private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
try {
boolean flag;
HCatURI hcatURI = new HCatURI(uri.toString());
if (!hcatURI.getPartitionMap().isEmpty()) {
List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
hcatURI.getPartitionMap());
flag = partitions != null && !partitions.isEmpty();
} else {
List<String> tables = client.listTableNamesByPattern(hcatURI.getDb(), hcatURI.getTable());
flag = tables != null && !tables.isEmpty();
}
return (flag);
}
catch (ConnectionFailureException e) {
throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (HCatException e) {
throw new HCatAccessorException(ErrorCode.E0902, e);
}
catch (URISyntaxException e) {
throw new HCatAccessorException(ErrorCode.E0902, e);
}
finally {
closeQuietly(client, null, closeClient);
}
}
private void closeQuietly(HCatClient client, String delegationToken, boolean close) {
if (close && client != null) {
try {
if(delegationToken != null && !delegationToken.isEmpty()) {
client.cancelDelegationToken(delegationToken);
}
client.close();
}
catch (Exception ignore) {
XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
}
}
}
class HCatClientWithToken {
private HCatClient hcatClient;
private String token;
public HCatClientWithToken(HCatClient client, String delegationToken) {
this.hcatClient = client;
this.token = delegationToken;
}
public HCatClient getHCatClient() {
return this.hcatClient;
}
public String getDelegationToken() {
return this.token;
}
}
static class HCatContext extends Context {
private HCatClient hcatClient;
private String delegationToken;
/**
* Create a HCatContext that can be used to access a hcat URI
*
* @param conf Configuration to access the URI
* @param user name of the user the URI should be accessed as
* @param hcatClient HCatClient to talk to hcatalog server
*/
public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
super(conf, user);
this.hcatClient = hcatClient;
}
public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) {
super(conf, user);
this.hcatClient = hcatClient.getHCatClient();
this.delegationToken = hcatClient.getDelegationToken();
}
/**
* Get the HCatClient to talk to hcatalog server
*
* @return HCatClient to talk to hcatalog server
*/
public HCatClient getHCatClient() {
return hcatClient;
}
/**
* Get the Delegation token to access HCat
*
* @return delegationToken
*/
public String getDelegationToken() {
return delegationToken;
}
@Override
public void destroy() {
try {
if (delegationToken != null && !delegationToken.isEmpty()) {
hcatClient.cancelDelegationToken(delegationToken);
}
delegationToken = null;
}
catch (Exception ignore) {
XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore);
}
try {
hcatClient.close();
}
catch (Exception ignore) {
XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
}
}
}
}