| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.manager.postgresql; |
| |
| import static org.apache.sqoop.manager.postgresql.PostgresqlTestUtil.CONNECT_STRING; |
| import static org.apache.sqoop.manager.postgresql.PostgresqlTestUtil.DATABASE_USER; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.sqoop.testcategories.sqooptest.ManualTest; |
| import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest; |
| import org.junit.Test; |
| |
| import org.apache.sqoop.TestExport; |
| import org.apache.sqoop.mapreduce.db.DBConfiguration; |
| import org.junit.experimental.categories.Category; |
| |
| |
| /** |
| * Test the PGBulkloadManager implementations. |
| * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it. |
| * |
| * Since this requires a Postgresql installation on your local machine to use, |
| * this class is named in such a way that Hadoop's default QA process does not |
| * run it. |
| * |
| * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest. |
| * |
| * You need to put Postgresql's JDBC driver library into lib dir. |
| * |
| * You need to create a sqooptest superuser and database and tablespace, |
| * and install pg_bulkload for sqooptest database: |
| * |
| * $ sudo -u postgres createuser -U postgres -s sqooptest |
| * $ sudo -u postgres createdb -U sqooptest sqooptest |
| * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace |
| * $ psql -U sqooptest |
| * -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest |
| * $ psql -U sqooptest sqooptest |
| * sqooptest=# CREATE USER sqooptest; |
| * sqooptest=# CREATE DATABASE sqooptest; |
| * sqooptest=# CREATE TABLESPACE sqooptest |
| * LOCATION '/var/pgdata/stagingtablespace'; |
| * sqooptest=# \q |
| * |
| */ |
| @Category({ManualTest.class, PostgresqlTest.class}) |
| public class PGBulkloadManagerManualTest extends TestExport { |
| |
| public static final Log LOG = |
| LogFactory.getLog(PGBulkloadManagerManualTest.class.getName()); |
| private DBConfiguration dbConf; |
| static final String TABLESPACE = |
| System.getProperty("sqoop.test.postgresql.tablespace", "sqoop"); |
| static final String PG_BULKLOAD = |
| System.getProperty("sqoop.test.postgresql.pg_bulkload", "pg_bulkload"); |
| |
| public PGBulkloadManagerManualTest() { |
| JobConf conf = new JobConf(getConf()); |
| DBConfiguration.configureDB(conf, |
| "org.postgresql.Driver", |
| getConnectString(), |
| getUserName(), |
| (String) null, (Integer) null); |
| dbConf = new DBConfiguration(conf); |
| } |
| |
| |
| @Override |
| protected boolean useHsqldbTestServer() { |
| return false; |
| } |
| |
| |
| @Override |
| protected String getConnectString() { |
| return CONNECT_STRING; |
| } |
| |
| |
| protected String getUserName() { |
| return DATABASE_USER; |
| } |
| |
| |
| @Override |
| protected String getTablePrefix() { |
| return super.getTablePrefix().toLowerCase(); |
| } |
| |
| |
| @Override |
| protected String getTableName() { |
| return super.getTableName().toLowerCase(); |
| } |
| |
| @Override |
| public String getStagingTableName() { |
| return super.getStagingTableName().toLowerCase(); |
| } |
| |
| |
| @Override |
| protected Connection getConnection() { |
| try { |
| Connection conn = dbConf.getConnection(); |
| conn.setAutoCommit(false); |
| PreparedStatement stmt = |
| conn.prepareStatement("SET extra_float_digits TO 0"); |
| stmt.executeUpdate(); |
| conn.commit(); |
| return conn; |
| } catch (SQLException sqlE) { |
| LOG.error("Could not get connection to test server: " + sqlE); |
| return null; |
| } catch (ClassNotFoundException cnfE) { |
| LOG.error("Could not find driver class: " + cnfE); |
| return null; |
| } |
| } |
| |
| |
| @Override |
| protected String getDropTableStatement(String tableName) { |
| return "DROP TABLE IF EXISTS " + tableName; |
| } |
| |
| |
| @Override |
| protected String[] getArgv(boolean includeHadoopFlags, |
| int rowsPerStatement, |
| int statementsPerTx, |
| String... additionalArgv) { |
| ArrayList<String> args = |
| new ArrayList<String>(Arrays.asList(additionalArgv)); |
| args.add("-D"); |
| args.add("pgbulkload.bin=" + PG_BULKLOAD); |
| args.add("--username"); |
| args.add(getUserName()); |
| args.add("--connection-manager"); |
| args.add("org.apache.sqoop.manager.PGBulkloadManager"); |
| args.add("--staging-table"); |
| args.add("dummy"); |
| args.add("--clear-staging-table"); |
| return super.getArgv(includeHadoopFlags, |
| rowsPerStatement, |
| statementsPerTx, |
| args.toArray(new String[0])); |
| } |
| |
| |
| @Override |
| protected String [] getCodeGenArgv(String... extraArgs) { |
| ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs)); |
| args.add("--username"); |
| args.add(getUserName()); |
| return super.getCodeGenArgv(args.toArray(new String[0])); |
| } |
| |
| |
| @Override |
| public void testColumnsExport() throws IOException, SQLException { |
| // PGBulkloadManager does not support --columns option. |
| } |
| |
| @Test |
| public void testMultiReduceExport() throws IOException, SQLException { |
| multiFileTest(2, 10, 2, "-D", "mapred.reduce.tasks=2"); |
| } |
| |
| @Test |
| public void testMultiReduceExportWithNewProp() |
| throws IOException, SQLException { |
| multiFileTest(2, 10, 2, "-D", "mapreduce.job.reduces=2"); |
| } |
| |
| @Test |
| public void testExportWithTablespace() throws IOException, SQLException { |
| multiFileTest(1, 10, 1, |
| "-D", "pgbulkload.staging.tablespace=" + TABLESPACE); |
| } |
| } |