blob: 0178ccb15b0e1d9eef133202961c8c4086e22e49 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.datasource.Credential;
import org.apache.falcon.entity.v0.datasource.Credentialtype;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.datasource.DatasourceType;
import org.apache.falcon.entity.v0.datasource.Driver;
import org.apache.falcon.entity.v0.datasource.Interface;
import org.apache.falcon.entity.v0.datasource.Interfaces;
import org.apache.falcon.entity.v0.datasource.Interfacetype;
import org.apache.falcon.entity.v0.datasource.PasswordAliasType;
import org.apache.falcon.entity.v0.datasource.Property;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.conf.Configuration;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CredentialProviderHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
/**
* DataSource entity helper methods.
*/
public final class DatasourceHelper {
public static final String HADOOP_CREDENTIAL_PROVIDER_FILEPATH = "hadoop.security.credential.provider.path";
private static final Logger LOG = LoggerFactory.getLogger(DatasourceHelper.class);
private static final ConfigurationStore STORE = ConfigurationStore.get();
public static DatasourceType getDatasourceType(String datasourceName) throws FalconException {
return getDatasource(datasourceName).getType();
}
private DatasourceHelper() {}
public static Datasource getDatasource(String datasourceName) throws FalconException {
return STORE.get(EntityType.DATASOURCE, datasourceName);
}
public static String getReadOnlyEndpoint(Datasource datasource) {
return getInterfaceEndpoint(datasource, Interfacetype.READONLY);
}
public static String getWriteEndpoint(Datasource datasource) {
return getInterfaceEndpoint(datasource, Interfacetype.WRITE);
}
/**
* Returns user name and password pair as it is specified in the XML. If the credential type is
* password-file, the path name is returned.
*
* @param db
* @return Credential
* @throws FalconException
*/
public static Credential getReadPasswordInfo(Datasource db) throws FalconException {
for (Interface ifs : db.getInterfaces().getInterfaces()) {
if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) {
return ifs.getCredential();
}
}
return getDefaultPasswordInfo(db.getInterfaces());
}
public static Credential getWritePasswordInfo(Datasource db) throws FalconException {
for (Interface ifs : db.getInterfaces().getInterfaces()) {
if ((ifs.getType() == Interfacetype.WRITE) && (ifs.getCredential() != null)) {
return ifs.getCredential();
}
}
return getDefaultPasswordInfo(db.getInterfaces());
}
/**
* Returns user name and actual password pair. If the credential type is password-file, then the
* password is read from the HDFS file. If the credential type is password-text, the clear text
* password is returned.
*
* @param db
* @return
* @throws FalconException
*/
public static java.util.Properties fetchReadPasswordInfo(Datasource db) throws FalconException {
Credential cred = getReadPasswordInfo(db);
return fetchPasswordInfo(cred);
}
public static java.util.Properties fetchWritePasswordInfo(Datasource db) throws FalconException {
Credential cred = getWritePasswordInfo(db);
return fetchPasswordInfo(cred);
}
public static java.util.Properties fetchPasswordInfo(Credential cred) throws FalconException {
java.util.Properties p = new java.util.Properties();
p.put("user", cred.getUserName());
if (cred.getType() == Credentialtype.PASSWORD_TEXT) {
p.put("password", cred.getPasswordText());
} else if (cred.getType() == Credentialtype.PASSWORD_FILE) {
String actualPasswd = fetchPasswordInfoFromFile(cred.getPasswordFile());
p.put("password", actualPasswd);
} else if (cred.getType() == Credentialtype.PASSWORD_ALIAS) {
String actualPasswd = fetchPasswordInfoFromCredentialStore(cred.getPasswordAlias());
p.put("password", actualPasswd);
}
return p;
}
public static String buildJceksProviderPath(URI credURI) {
StringBuilder sb = new StringBuilder();
final String credProviderPath = sb.append("jceks:").append("//")
.append(credURI.getScheme()).append("@")
.append(credURI.getHost())
.append(credURI.getPath()).toString();
return credProviderPath;
}
/**
* checks if two datasource interfaces are same.
*
* @param oldEntity old datasource entity
* @param newEntity new datasource entity
* @param ifacetype type of interface
* @return true if same else false
*/
public static boolean isSameInterface(Datasource oldEntity, Datasource newEntity, Interfacetype ifacetype) {
LOG.debug("Verifying if Interfaces match for Datasource {} : Old - {}, New - {}", oldEntity, newEntity);
Interface oIface = getInterface(oldEntity, ifacetype);
Interface nIface = getInterface(newEntity, ifacetype);
if ((oIface == null) && (nIface == null)) {
return true;
}
if ((oIface == null) || (nIface == null)) {
return false;
}
return (StringUtils.equals(oIface.getEndpoint(), nIface.getEndpoint())
&& isSameDriverClazz(oIface.getDriver(), nIface.getDriver())
&& isSameCredentials(oIface.getCredential(), nIface.getCredential()));
}
/**
* check if datasource driver is same.
* @param oldEntity
* @param newEntity
* @return true if same or false
*/
public static boolean isSameDriverClazz(Driver oldEntity, Driver newEntity) {
if ((oldEntity == null) && (newEntity == null)) {
return true;
}
if ((oldEntity == null) || (newEntity == null)) {
return false;
}
return StringUtils.equals(oldEntity.getClazz(), newEntity.getClazz());
}
/**
* checks if data source properties are same.
* @param oldEntity
* @param newEntity
* @return true if same else false
*/
public static boolean isSameProperties(Datasource oldEntity, Datasource newEntity) {
Map<String, String> oldProps = getDatasourceProperties(oldEntity);
Map<String, String> newProps = getDatasourceProperties(newEntity);
return oldProps.equals(newProps);
}
/**
* checks if data source credentials are same.
* @param oCred
* @param nCred
* @return true true
*/
public static boolean isSameCredentials(Credential oCred, Credential nCred) {
if ((oCred == null) && (nCred == null)) {
return true;
}
if ((oCred == null) || (nCred == null)) {
return true;
}
if (StringUtils.equals(oCred.getUserName(), nCred.getUserName())) {
if (oCred.getType() == nCred.getType()) {
if (oCred.getType() == Credentialtype.PASSWORD_TEXT) {
return StringUtils.equals(oCred.getPasswordText(), nCred.getPasswordText());
} else if (oCred.getType() == Credentialtype.PASSWORD_FILE) {
return StringUtils.equals(oCred.getPasswordFile(), nCred.getPasswordFile());
} else if (oCred.getType() == Credentialtype.PASSWORD_ALIAS) {
return (StringUtils.equals(oCred.getPasswordAlias().getAlias(),
nCred.getPasswordAlias().getAlias())
&& StringUtils.equals(oCred.getPasswordAlias().getProviderPath(),
nCred.getPasswordAlias().getProviderPath()));
}
} else {
return false;
}
}
return false;
}
public static Credential getCredential(Datasource db) {
return getCredential(db, null);
}
public static Credential getCredential(Datasource db, Interfacetype interfaceType) {
if (interfaceType == null) {
return db.getInterfaces().getCredential();
} else {
for(Interface iface : db.getInterfaces().getInterfaces()) {
if (iface.getType() == interfaceType) {
return iface.getCredential();
}
}
}
return null;
}
public static void validateCredential(Credential cred) throws FalconException {
if (cred == null) {
return;
}
switch (cred.getType()) {
case PASSWORD_TEXT:
if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordText())) {
throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'",
cred.getType().value(), "userName", "passwordText"));
}
break;
case PASSWORD_FILE:
if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordFile())) {
throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'",
cred.getType().value(), "userName", "passwordFile"));
}
break;
case PASSWORD_ALIAS:
if (StringUtils.isBlank(cred.getUserName()) || (cred.getPasswordAlias() == null)
|| StringUtils.isBlank(cred.getPasswordAlias().getAlias())
|| StringUtils.isBlank(cred.getPasswordAlias().getProviderPath())) {
throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s' or %s'",
cred.getType().value(), "userName", "alias", "providerPath"));
}
break;
default:
throw new FalconException(String.format("Unknown Credential type '%s'", cred.getType().value()));
}
}
/**
* Return the Interface endpoint for the interface type specified in the argument.
*
* @param db
* @param type - can be read-only or write
* @return
*/
private static String getInterfaceEndpoint(Datasource db, Interfacetype type) {
if (getInterface(db, type) != null) {
return getInterface(db, type).getEndpoint();
} else {
return null;
}
}
private static Interface getInterface(Datasource db, Interfacetype type) {
for(Interface ifs : db.getInterfaces().getInterfaces()) {
if (ifs.getType() == type) {
return ifs;
}
}
return null;
}
private static Credential getDefaultPasswordInfo(Interfaces ifs) throws FalconException {
if (ifs.getCredential() != null) {
return ifs.getCredential();
} else {
throw new FalconException("Missing Interfaces default credential");
}
}
/**
* fetch password from the corresponding store.
* @param c
* @return actual password
* @throws FalconException
*/
private static String fetchPasswordInfoFromCredentialStore(final PasswordAliasType c) throws FalconException {
try {
final String credPath = c.getProviderPath();
final URI credURI = new URI(credPath);
if (StringUtils.isBlank(credURI.getScheme())
|| StringUtils.isBlank(credURI.getHost())
|| StringUtils.isBlank(credURI.getPath())) {
throw new FalconException("Password alias jceks provider HDFS path is incorrect.");
}
final String alias = c.getAlias();
if (StringUtils.isBlank(alias)) {
throw new FalconException("Password alias is empty.");
}
final String credProviderPath = buildJceksProviderPath(credURI);
LOG.info("Credential provider HDFS path : " + credProviderPath);
if (CredentialProviderHelper.isProviderAvailable()) {
UserGroupInformation ugi = CurrentUser.getProxyUGI();
String password = ugi.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws Exception {
final Configuration conf = new Configuration();
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, credPath);
conf.set(CredentialProviderHelper.CREDENTIAL_PROVIDER_PATH, credProviderPath);
FileSystem fs = FileSystem.get(credURI, conf);
if (!fs.exists(new Path(credPath))) {
String msg = String.format("Credential provider hdfs path [%s] does not "
+ "exist or access denied!", credPath);
LOG.error(msg);
throw new FalconException(msg);
}
return CredentialProviderHelper.resolveAlias(conf, alias);
}
});
return password;
} else {
throw new FalconException("Credential Provider is not initialized");
}
} catch (Exception ioe) {
String msg = "Exception while trying to fetch credential alias";
LOG.error(msg, ioe);
throw new FalconException(msg, ioe);
}
}
/**
* fetch the password from file.
*
* @param passwordFilePath
* @return
* @throws FalconException
*/
private static String fetchPasswordInfoFromFile(String passwordFilePath) throws FalconException {
try {
Path path = new Path(passwordFilePath);
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
if (!fs.exists(path)) {
throw new IOException("The password file does not exist! ");
}
if (!fs.isFile(path)) {
throw new IOException("The password file cannot be a directory! ");
}
InputStream is = fs.open(path);
StringWriter writer = new StringWriter();
try {
IOUtils.copy(is, writer);
return writer.toString();
} finally {
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(writer);
fs.close();
}
} catch (IOException ioe) {
LOG.error("Error reading password file from HDFS : " + ioe);
throw new FalconException(ioe);
}
}
/*
returns data store properties
*/
public static Map<String, String> getDatasourceProperties(final Datasource datasource) {
Map<String, String> returnProps = new HashMap<String, String>();
if (datasource.getProperties() != null) {
for (Property prop : datasource.getProperties().getProperties()) {
returnProps.put(prop.getName(), prop.getValue());
}
}
return returnProps;
}
}