| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.cloudera.sqoop.manager; |
| |
| import com.cloudera.sqoop.SqoopOptions; |
| import com.cloudera.sqoop.testutil.CommonArgs; |
| import com.cloudera.sqoop.testutil.ExportJobTestCase; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.Writer; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * |
| */ |
| public class PostgresqlExportTest extends ExportJobTestCase { |
| public static final Log LOG = LogFactory.getLog( |
| PostgresqlExportTest.class.getName()); |
| |
| static final String HOST_URL = System.getProperty( |
| "sqoop.test.postgresql.connectstring.host_url", |
| "jdbc:postgresql://localhost/"); |
| static final String DATABASE_USER = System.getProperty( |
| "sqoop.test.postgresql.username", |
| "sqooptest"); |
| static final String DATABASE_NAME = System.getProperty( |
| "sqoop.test.postgresql.database", |
| "sqooptest"); |
| static final String PASSWORD = System.getProperty( |
| "sqoop.test.postgresql.password"); |
| |
| static final String TABLE_NAME = "EMPLOYEES_PG"; |
| static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE"; |
| static final String STAGING_TABLE_NAME = "STAGING"; |
| static final String SCHEMA_PUBLIC = "public"; |
| static final String SCHEMA_SPECIAL = "special"; |
| static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; |
| |
| protected Connection connection; |
| |
| @Override |
| protected boolean useHsqldbTestServer() { |
| return false; |
| } |
| |
| private String getDropTableStatement(String tableName, String schema) { |
| return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName); |
| } |
| |
| @Before |
| public void setUp() { |
| super.setUp(); |
| |
| LOG.debug("Setting up postgresql test: " + CONNECT_STRING); |
| |
| try { |
| connection = DriverManager.getConnection(CONNECT_STRING, DATABASE_USER, PASSWORD); |
| connection.setAutoCommit(false); |
| } catch (SQLException ex) { |
| LOG.error("Can't create connection", ex); |
| throw new RuntimeException(ex); |
| } |
| |
| createTable(TABLE_NAME, SCHEMA_PUBLIC); |
| createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC); |
| createTable(TABLE_NAME, SCHEMA_SPECIAL); |
| createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL); |
| createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC); |
| |
| LOG.debug("setUp complete."); |
| } |
| |
| @Override |
| public void tearDown() { |
| try { |
| Statement stmt = connection.createStatement(); |
| stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC)); |
| stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_PUBLIC)); |
| stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_SPECIAL)); |
| stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_SPECIAL)); |
| } catch(SQLException e) { |
| LOG.error("Can't clean up the database:", e); |
| } |
| |
| super.tearDown(); |
| |
| try { |
| connection.close(); |
| } catch (SQLException e) { |
| LOG.error("Ignoring exception in tearDown", e); |
| } |
| } |
| |
| private interface CreateIt { |
| void createIt( |
| Statement st, |
| String fullName, |
| ConnManager manager) throws SQLException; |
| } |
| |
| private void createTable(String tableName, String schema) { |
| CreateIt createIt = new CreateIt() { |
| @Override |
| public void createIt( |
| Statement st, |
| String fullName, |
| ConnManager manager) throws SQLException { |
| st.executeUpdate("CREATE TABLE " + fullName + " (" |
| + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " |
| + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " |
| + manager.escapeColName("start_date") + " DATE, " |
| + manager.escapeColName("salary") + " FLOAT, " |
| + manager.escapeColName("dept") + " VARCHAR(32))"); |
| } |
| }; |
| create(tableName, "TABLE", schema, createIt); |
| } |
| |
| private void createProcedure(String procedureName, String schema) { |
| CreateIt createIt = new CreateIt() { |
| @Override |
| public void createIt( |
| Statement st, |
| String fullName, |
| ConnManager manager) throws SQLException { |
| st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " (" |
| + "IN " + manager.escapeColName("id") + " INT," |
| + "IN " + manager.escapeColName("name") + " VARCHAR(24)," |
| + "IN " + manager.escapeColName("start_date") + " DATE," |
| + "IN " + manager.escapeColName("salary") + " FLOAT," |
| + "IN " + manager.escapeColName("dept") + " VARCHAR(32)" |
| + ") " |
| + "RETURNS VOID " |
| + "AS $$ " |
| + "BEGIN " |
| + "INSERT INTO " |
| + quoteTableOrSchemaName(SCHEMA_PUBLIC) |
| + "." |
| + quoteTableOrSchemaName(TABLE_NAME) |
| + " (" |
| + manager.escapeColName("id") |
| +", " |
| + manager.escapeColName("name") |
| +", " |
| + manager.escapeColName("start_date") |
| +", " |
| + manager.escapeColName("salary") |
| +", " |
| + manager.escapeColName("dept") |
| + ") VALUES (" |
| + manager.escapeColName("id") |
| +", " |
| + manager.escapeColName("name") |
| +", " |
| + manager.escapeColName("start_date") |
| +", " |
| + manager.escapeColName("salary") |
| +", " |
| + manager.escapeColName("dept") |
| + ");" |
| + "END;" |
| + "$$ LANGUAGE plpgsql;"); |
| } |
| }; |
| create(procedureName, "FUNCTION", schema, createIt); |
| } |
| |
| private void create( |
| String name, |
| String type, |
| String schema, |
| CreateIt createIt) { |
| SqoopOptions options = new SqoopOptions(CONNECT_STRING, name); |
| options.setUsername(DATABASE_USER); |
| |
| ConnManager manager = null; |
| Statement st = null; |
| |
| try { |
| manager = new PostgresqlManager(options); |
| st = connection.createStatement(); |
| |
| // Create schema if not exists in dummy way (always create and ignore |
| // errors. |
| try { |
| st.executeUpdate("CREATE SCHEMA " + quoteTableOrSchemaName(schema)); |
| connection.commit(); |
| } catch (SQLException e) { |
| LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" |
| + "the schema already exists.", e); |
| connection.rollback(); |
| } |
| |
| String fullTableName = quoteTableOrSchemaName(schema) |
| + "." + quoteTableOrSchemaName(name); |
| |
| try { |
| // Try to remove the table first. DROP TABLE IF EXISTS didn't |
| // get added until pg 8.3, so we just use "DROP TABLE" and ignore |
| // any exception here if one occurs. |
| st.executeUpdate("DROP " + type + " " + fullTableName); |
| } catch (SQLException e) { |
| LOG.info("Couldn't drop " |
| + type.toLowerCase() |
| + " " +fullTableName |
| + " (ok)", |
| e); |
| // Now we need to reset the transaction. |
| connection.rollback(); |
| } |
| |
| createIt.createIt(st, fullTableName, manager); |
| |
| connection.commit(); |
| } catch (SQLException sqlE) { |
| LOG.error("Encountered SQL Exception: " + sqlE); |
| sqlE.printStackTrace(); |
| fail("SQLException when running test setUp(): " + sqlE); |
| } finally { |
| try { |
| if (null != st) { |
| st.close(); |
| } |
| |
| if (null != manager) { |
| manager.close(); |
| } |
| } catch (SQLException sqlE) { |
| LOG.warn("Got SQLException when closing connection: " + sqlE); |
| } |
| } |
| |
| LOG.debug("setUp complete."); |
| } |
| |
| private String [] getArgv(boolean useTable, |
| String... extraArgs) { |
| ArrayList<String> args = new ArrayList<String>(); |
| |
| CommonArgs.addHadoopFlags(args); |
| |
| if (useTable) { |
| args.add("--table"); |
| args.add(TABLE_NAME); |
| } else { |
| args.add("--call"); |
| args.add(PROCEDURE_NAME); |
| } |
| args.add("--export-dir"); |
| args.add(getWarehouseDir()); |
| args.add("--fields-terminated-by"); |
| args.add(","); |
| args.add("--lines-terminated-by"); |
| args.add("\\n"); |
| args.add("--connect"); |
| args.add(CONNECT_STRING); |
| args.add("--username"); |
| args.add(DATABASE_USER); |
| args.add("--password"); |
| args.add(PASSWORD); |
| args.add("-m"); |
| args.add("1"); |
| |
| for (String arg : extraArgs) { |
| args.add(arg); |
| } |
| |
| return args.toArray(new String[0]); |
| } |
| |
| protected void createTestFile(String filename, |
| String[] lines) |
| throws IOException { |
| new File(getWarehouseDir()).mkdirs(); |
| File file = new File(getWarehouseDir() + "/" + filename); |
| Writer output = new BufferedWriter(new FileWriter(file)); |
| for(String line : lines) { |
| output.write(line); |
| output.write("\n"); |
| } |
| output.close(); |
| } |
| |
| @Test |
| public void testExport() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| runExport(getArgv(true)); |
| |
| assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); |
| } |
| |
| @Test |
| public void testExportUsingProcedure() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| runExport(getArgv(false)); |
| |
| assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); |
| } |
| |
| @Test |
| public void testExportStaging() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, }; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); |
| } |
| |
| @Test |
| public void testExportDirect() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] {"--direct"}; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); |
| } |
| |
| @Test |
| public void testExportCustomSchema() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] {"--", |
| "--schema", |
| SCHEMA_SPECIAL, |
| }; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, |
| quoteTableOrSchemaName(SCHEMA_SPECIAL) |
| + "." + quoteTableOrSchemaName(TABLE_NAME), |
| connection); |
| } |
| |
| @Test |
| public void testExportCustomSchemaStaging() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] { |
| "--staging-table", |
| STAGING_TABLE_NAME, |
| "--", |
| "--schema", |
| SCHEMA_SPECIAL, |
| }; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, |
| quoteTableOrSchemaName(SCHEMA_SPECIAL) |
| + "." + quoteTableOrSchemaName(TABLE_NAME), |
| connection); |
| } |
| |
| @Test |
| public void testExportCustomSchemaStagingClear() |
| throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] { |
| "--staging-table", |
| STAGING_TABLE_NAME, |
| "--clear-staging-table", |
| "--", |
| "--schema", |
| SCHEMA_SPECIAL, |
| }; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, |
| quoteTableOrSchemaName(SCHEMA_SPECIAL) |
| + "." + quoteTableOrSchemaName(TABLE_NAME), |
| connection); |
| } |
| |
| @Test |
| public void testExportCustomSchemaDirect() throws IOException, SQLException { |
| createTestFile("inputFile", new String[] { |
| "2,Bob,2009-04-20,400,sales", |
| "3,Fred,2009-01-23,15,marketing", |
| }); |
| |
| String[] extra = new String[] { |
| "--direct", |
| "--", |
| "--schema", |
| SCHEMA_SPECIAL, |
| }; |
| |
| runExport(getArgv(true, extra)); |
| |
| assertRowCount(2, |
| quoteTableOrSchemaName(SCHEMA_SPECIAL) |
| + "." + quoteTableOrSchemaName(TABLE_NAME), |
| connection); |
| } |
| |
| public static void assertRowCount(long expected, |
| String tableName, |
| Connection connection) { |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = connection.createStatement(); |
| rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); |
| |
| rs.next(); |
| |
| assertEquals(expected, rs.getLong(1)); |
| } catch (SQLException e) { |
| LOG.error("Can't verify number of rows", e); |
| fail(); |
| } finally { |
| try { |
| connection.commit(); |
| |
| if (stmt != null) { |
| stmt.close(); |
| } |
| if (rs != null) { |
| rs.close(); |
| } |
| } catch (SQLException ex) { |
| LOG.info("Ignored exception in finally block."); |
| } |
| } |
| } |
| |
| public String quoteTableOrSchemaName(String tableName) { |
| return "\"" + tableName + "\""; |
| } |
| } |