blob: 358fbce7ac7deca4b01a2fc9ae42c377cb2cc57e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.parser;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.datasource.ACL;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.datasource.Interfacetype;
import org.apache.falcon.util.HdfsClassLoader;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.util.Arrays;
import java.util.Properties;
/**
* Parser for DataSource entity definition.
*/
public class DatasourceEntityParser extends EntityParser<Datasource> {
private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class);
public DatasourceEntityParser() {
super(EntityType.DATASOURCE);
}
@Override
public void validate(Datasource db) throws FalconException {
try {
ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars());
DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db));
DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.READONLY));
DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.WRITE));
validateInterface(db, Interfacetype.READONLY, hdfsClassLoader);
validateInterface(db, Interfacetype.WRITE, hdfsClassLoader);
validateACL(db);
} catch(IOException io) {
throw new ValidationException("Unable to copy driver jars to local dir: "
+ Arrays.toString(db.getDriver().getJars().toArray()));
}
}
private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader)
throws ValidationException {
String endpoint = null;
Properties userPasswdInfo = null;
try {
if (interfacetype == Interfacetype.READONLY) {
endpoint = DatasourceHelper.getReadOnlyEndpoint(db);
userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db);
} else if (interfacetype == Interfacetype.WRITE) {
endpoint = DatasourceHelper.getWriteEndpoint(db);
userPasswdInfo = DatasourceHelper.fetchWritePasswordInfo(db);
}
if (StringUtils.isNotBlank(endpoint)) {
LOG.info("Validating {} endpoint {} connection.", interfacetype.value(), endpoint);
validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo);
}
} catch(FalconException fe) {
throw new ValidationException(String.format("Cannot validate '%s' "
+ "interface '%s' " + "of database entity '%s' due to '%s' ",
interfacetype, endpoint,
db.getName(), fe.getMessage()));
}
}
private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass,
String connectUrl, Properties userPasswdInfo)
throws FalconException {
ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
LOG.info("Preserving current classloader: {}", previousClassLoader.toString());
try {
Thread.currentThread().setContextClassLoader(hdfsClassLoader);
LOG.info("Setting context classloader to : {}", hdfsClassLoader.toString());
java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance();
LOG.info("Validating connection URL: {} using driver: {}", connectUrl, driver.getClass().toString());
Connection con = driver.connect(connectUrl, userPasswdInfo);
if (con == null) {
throw new FalconException("DriverManager.getConnection() return "
+ "null for URL : " + connectUrl);
}
} catch (Exception ex) {
LOG.error("Exception while validating connection : ", ex);
throw new FalconException(ex);
} finally {
Thread.currentThread().setContextClassLoader(previousClassLoader);
LOG.info("Restoring original classloader {}", previousClassLoader.toString());
}
}
/**
* Validate ACL if authorization is enabled.
*
* @param db database entity
* @throws ValidationException
*/
private void validateACL(Datasource db) throws ValidationException {
if (isAuthorizationDisabled) {
return;
}
// Validate the entity owner is logged-in, authenticated user if authorization is enabled
final ACL dbACL = db.getACL();
if (dbACL == null) {
throw new ValidationException("Datasource ACL cannot be empty for: " + db.getName());
}
validateACLOwnerAndGroup(dbACL);
try {
authorize(db.getName(), dbACL);
} catch (AuthorizationException e) {
throw new ValidationException(e);
}
}
}