blob: 3817cba35aa578fbf0ee201683f11561c8f94708 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.tests.e2e.sqoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.DatabaseProviderFactory;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.common.test.utils.NetworkUtils;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.validation.Status;
import org.codehaus.cargo.container.ContainerType;
import org.codehaus.cargo.container.InstalledLocalContainer;
import org.codehaus.cargo.container.configuration.ConfigurationType;
import org.codehaus.cargo.container.configuration.LocalConfiguration;
import org.codehaus.cargo.container.deployable.WAR;
import org.codehaus.cargo.container.installer.Installer;
import org.codehaus.cargo.container.installer.ZipURLInstaller;
import org.codehaus.cargo.container.property.ServletPropertySet;
import org.codehaus.cargo.container.tomcat.TomcatPropertySet;
import org.codehaus.cargo.generic.DefaultContainerFactory;
import org.codehaus.cargo.generic.configuration.DefaultConfigurationFactory;
import com.google.common.base.Joiner;
public class TomcatSqoopRunner {
private static final Logger LOG = Logger.getLogger(TomcatSqoopRunner.class);
private SqoopServerEnableSentry server;
private DatabaseProvider provider;
private String temporaryPath;
public TomcatSqoopRunner(String temporaryPath, String serverName, String sentrySite)
throws Exception {
this.temporaryPath = temporaryPath;
this.server = new SqoopServerEnableSentry(temporaryPath, serverName, sentrySite);
this.provider = DatabaseProviderFactory.getProvider(System.getProperties());
}
public void start() throws Exception {
server.start();
provider.start();
}
public void stop() throws Exception {
server.stop();
provider.stop();
}
/**
* create link.
*
* With asserts to make sure that it was created correctly.
* @param sqoopClient
* @param link
*/
public void saveLink(SqoopClient client, MLink link) {
assertEquals(Status.OK, client.saveLink(link));
assertNotSame(MPersistableEntity.PERSISTANCE_ID_DEFAULT, link.getPersistenceId());
}
/**
* create link.
*
* With asserts to make sure that it was created correctly.
* @param sqoopClient
* @param link
*/
public void updateLink(SqoopClient client, MLink link) {
assertEquals(Status.OK, client.updateLink(link));
assertNotSame(MPersistableEntity.PERSISTANCE_ID_DEFAULT, link.getPersistenceId());
}
/**
* Create job.
*
* With asserts to make sure that it was created correctly.
*
* @param job
*/
public void saveJob(SqoopClient client, MJob job) {
assertEquals(Status.OK, client.saveJob(job));
assertNotSame(MPersistableEntity.PERSISTANCE_ID_DEFAULT, job.getPersistenceId());
}
/**
* fill link.
*
* With asserts to make sure that it was filled correctly.
*
* @param link
*/
public void fillHdfsLink(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.confDir").setValue(server.getConfigurationPath());
}
/**
* Fill link config based on currently active provider.
*
* @param link MLink object to fill
*/
public void fillRdbmsLinkConfig(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.jdbcDriver").setValue(provider.getJdbcDriver());
configs.getStringInput("linkConfig.connectionString").setValue(provider.getConnectionUrl());
configs.getStringInput("linkConfig.username").setValue(provider.getConnectionUsername());
configs.getStringInput("linkConfig.password").setValue(provider.getConnectionPassword());
}
public void fillHdfsFromConfig(MJob job) {
MConfigList fromConfig = job.getFromJobConfig();
fromConfig.getStringInput("fromJobConfig.inputDirectory").setValue(temporaryPath + "/output");
}
public void fillRdbmsToConfig(MJob job) {
MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.tableName").setValue(provider.
escapeTableName(new TableName(getClass().getSimpleName()).getTableName()));
}
/**
* get a sqoopClient for specific user
* @param user
*/
public SqoopClient getSqoopClient(String user) {
setAuthenticationUser(user);
return new SqoopClient(server.getServerUrl());
}
/**
* Set the mock user in the Sqoop simple authentication
* @param user
*/
private void setAuthenticationUser(String user) {
System.setProperty("user.name", user);
}
private static class SqoopServerEnableSentry extends SqoopMiniCluster {
private static final String WAR_PATH = "thirdparty/sqoop.war";
private static final String TOMCAT_PATH = "thirdparty/apache-tomcat-6.0.36.zip";
private InstalledLocalContainer container = null;
private Integer port;
private Integer ajpPort;
private String sentrySite;
private String serverName;
SqoopServerEnableSentry(String temporaryPath, String serverName, String sentrySite)
throws Exception {
super(temporaryPath);
this.serverName = serverName;
this.sentrySite = sentrySite;
// Random port
this.port = NetworkUtils.findAvailablePort();
this.ajpPort = NetworkUtils.findAvailablePort();
}
@Override
public Map<String, String> getSecurityConfiguration() {
Map<String, String> properties = new HashMap<String, String>();
configureAuthentication(properties);
configureSentryAuthorization(properties);
return properties;
}
private void configureAuthentication(Map<String, String> properties) {
/** Simple Authentication */
properties.put("org.apache.sqoop.authentication.type", "SIMPLE");
properties.put("org.apache.sqoop.authentication.handler",
"org.apache.sqoop.security.SimpleAuthenticationHandler");
}
private void configureSentryAuthorization(Map<String, String> properties) {
properties.put("org.apache.sqoop.security.authorization.handler",
"org.apache.sentry.sqoop.authz.SentryAuthorizationHander");
properties.put("org.apache.sqoop.security.authorization.access_controller",
"org.apache.sentry.sqoop.authz.SentryAccessController");
properties.put("org.apache.sqoop.security.authorization.validator",
"org.apache.sentry.sqoop.authz.SentryAuthorizationValidator");
properties.put("org.apache.sqoop.security.authorization.server_name", serverName);
properties.put("sentry.sqoop.site.url", sentrySite);
/** set Sentry related jars into classpath */
List<String> extraClassPath = new LinkedList<String>();
for (String jar : System.getProperty("java.class.path").split(":")) {
if ((jar.contains("sentry") || jar.contains("shiro-core") || jar.contains("libthrift"))
&& jar.endsWith("jar")) {
extraClassPath.add(jar);
}
}
properties.put("org.apache.sqoop.classpath.extra",Joiner.on(":").join(extraClassPath));
}
@Override
public void start() throws Exception {
// Container has already been started
if (container != null) {
return;
}
prepareTemporaryPath();
// Source: http://cargo.codehaus.org/Functional+testing
String tomcatPath = getTemporaryPath() + "/tomcat";
String extractPath = tomcatPath + "/extract";
String confPath = tomcatPath + "/conf";
Installer installer = new ZipURLInstaller(new File(TOMCAT_PATH).toURI().toURL(), null, extractPath);
installer.install();
LocalConfiguration configuration = (LocalConfiguration) new DefaultConfigurationFactory()
.createConfiguration("tomcat6x", ContainerType.INSTALLED, ConfigurationType.STANDALONE,
confPath);
container = (InstalledLocalContainer) new DefaultContainerFactory().createContainer("tomcat6x",
ContainerType.INSTALLED, configuration);
// Set home to our installed tomcat instance
container.setHome(installer.getHome());
// Store tomcat logs into file as they are quite handy for debugging
container.setOutput(getTemporaryPath() + "/log/tomcat.log");
// Propagate system properties to the container
Map<String, String> map = new HashMap<String, String>((Map) System.getProperties());
container.setSystemProperties(map);
// Propagate Hadoop jars to the container classpath
// In real world, they would be installed manually by user
List<String> extraClassPath = new LinkedList<String>();
String[] classpath = System.getProperty("java.class.path").split(":");
for (String jar : classpath) {
if (jar.contains("hadoop-") || // Hadoop jars
jar.contains("hive-") || // Hive jars
jar.contains("commons-") || // Apache Commons libraries
jar.contains("httpcore-") || // Apache Http Core libraries
jar.contains("httpclient-") || // Apache Http Client libraries
jar.contains("htrace-") || // htrace-core libraries, new added in
// Hadoop 2.6.0
jar.contains("zookeeper-") || // zookeeper libraries, new added in
// Hadoop 2.6.0
jar.contains("curator-") || // curator libraries, new added in Hadoop
// 2.6.0
jar.contains("log4j-") || // Log4j
jar.contains("slf4j-") || // Slf4j
jar.contains("jackson-") || // Jackson
jar.contains("derby") || // Derby drivers
jar.contains("avro-") || // Avro
jar.contains("parquet-") || // Parquet
jar.contains("mysql") || // MySQL JDBC driver
jar.contains("postgre") || // PostgreSQL JDBC driver
jar.contains("oracle") || // Oracle driver
jar.contains("terajdbc") || // Teradata driver
jar.contains("tdgs") || // Teradata driver
jar.contains("nzjdbc") || // Netezza driver
jar.contains("sqljdbc") || // Microsoft SQL Server driver
jar.contains("datanucleus-") || // Data nucleus libs
jar.contains("google") // Google libraries (guava, ...)
) {
extraClassPath.add(jar);
}
}
container.setExtraClasspath(extraClassPath.toArray(new String[extraClassPath.size()]));
// Finally deploy Sqoop server war file
configuration.addDeployable(new WAR(WAR_PATH));
configuration.setProperty(ServletPropertySet.PORT, port.toString());
configuration.setProperty(TomcatPropertySet.AJP_PORT, ajpPort.toString());
//configuration.setProperty(GeneralPropertySet.JVMARGS, "\"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8006\"");
LOG.info("Tomcat extract path: " + extractPath);
LOG.info("Tomcat home path: " + installer.getHome());
LOG.info("Tomcat config home path: " + confPath);
LOG.info("Starting tomcat server on port " + port);
container.start();
}
@Override
public void stop() throws Exception {
if (container != null) {
container.stop();
}
}
/**
* Return server URL.
*/
public String getServerUrl() {
// We're not doing any changes, so return default URL
return "http://localhost:" + port + "/sqoop/";
}
}
}