blob: 457b398aa485358b36d615ba33990357f60aebda [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.postgresql;
import static org.apache.sqoop.manager.postgresql.PostgresqlTestUtil.CONNECT_STRING;
import static org.apache.sqoop.manager.postgresql.PostgresqlTestUtil.DATABASE_USER;
import static org.apache.sqoop.manager.postgresql.PostgresqlTestUtil.PASSWORD;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.ExportJobTestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.PostgresqlManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
*
*/
@Category(PostgresqlTest.class)
public class PostgresqlExportTest extends ExportJobTestCase {
public static final Log LOG = LogFactory.getLog(
PostgresqlExportTest.class.getName());
static final String TABLE_NAME = "EMPLOYEES_PG";
static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE";
static final String STAGING_TABLE_NAME = "STAGING";
static final String SCHEMA_PUBLIC = "public";
static final String SCHEMA_SPECIAL = "special";
protected Connection connection;
@Override
protected boolean useHsqldbTestServer() {
return false;
}
private String getDropTableStatement(String tableName, String schema) {
return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
}
@Before
public void setUp() {
super.setUp();
LOG.debug("Setting up postgresql test: " + CONNECT_STRING);
try {
connection = DriverManager.getConnection(CONNECT_STRING, DATABASE_USER, PASSWORD);
connection.setAutoCommit(false);
} catch (SQLException ex) {
LOG.error("Can't create connection", ex);
throw new RuntimeException(ex);
}
createTable(TABLE_NAME, SCHEMA_PUBLIC);
createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
createTable(TABLE_NAME, SCHEMA_SPECIAL);
createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC);
LOG.debug("setUp complete.");
}
@Override
public void tearDown() {
try {
Statement stmt = connection.createStatement();
stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_PUBLIC));
stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_SPECIAL));
stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_SPECIAL));
} catch(SQLException e) {
LOG.error("Can't clean up the database:", e);
}
super.tearDown();
try {
connection.close();
} catch (SQLException e) {
LOG.error("Ignoring exception in tearDown", e);
}
}
private interface CreateIt {
void createIt(
Statement st,
String fullName,
ConnManager manager) throws SQLException;
}
private void createTable(String tableName, String schema) {
CreateIt createIt = new CreateIt() {
@Override
public void createIt(
Statement st,
String fullName,
ConnManager manager) throws SQLException {
st.executeUpdate("CREATE TABLE " + fullName + " ("
+ manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+ manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+ manager.escapeColName("start_date") + " DATE, "
+ manager.escapeColName("salary") + " FLOAT, "
+ manager.escapeColName("dept") + " VARCHAR(32))");
}
};
create(tableName, "TABLE", schema, createIt);
}
private void createProcedure(String procedureName, String schema) {
CreateIt createIt = new CreateIt() {
@Override
public void createIt(
Statement st,
String fullName,
ConnManager manager) throws SQLException {
st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " ("
+ "IN " + manager.escapeColName("id") + " INT,"
+ "IN " + manager.escapeColName("name") + " VARCHAR(24),"
+ "IN " + manager.escapeColName("start_date") + " DATE,"
+ "IN " + manager.escapeColName("salary") + " FLOAT,"
+ "IN " + manager.escapeColName("dept") + " VARCHAR(32)"
+ ") "
+ "RETURNS VOID "
+ "AS $$ "
+ "BEGIN "
+ "INSERT INTO "
+ quoteTableOrSchemaName(SCHEMA_PUBLIC)
+ "."
+ quoteTableOrSchemaName(TABLE_NAME)
+ " ("
+ manager.escapeColName("id")
+", "
+ manager.escapeColName("name")
+", "
+ manager.escapeColName("start_date")
+", "
+ manager.escapeColName("salary")
+", "
+ manager.escapeColName("dept")
+ ") VALUES ("
+ manager.escapeColName("id")
+", "
+ manager.escapeColName("name")
+", "
+ manager.escapeColName("start_date")
+", "
+ manager.escapeColName("salary")
+", "
+ manager.escapeColName("dept")
+ ");"
+ "END;"
+ "$$ LANGUAGE plpgsql;");
}
};
create(procedureName, "FUNCTION", schema, createIt);
}
private void create(
String name,
String type,
String schema,
CreateIt createIt) {
SqoopOptions options = new SqoopOptions(CONNECT_STRING, name);
options.setUsername(DATABASE_USER);
ConnManager manager = null;
Statement st = null;
try {
manager = new PostgresqlManager(options);
st = connection.createStatement();
// Create schema if not exists in dummy way (always create and ignore
// errors.
try {
st.executeUpdate("CREATE SCHEMA " + quoteTableOrSchemaName(schema));
connection.commit();
} catch (SQLException e) {
LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+ "the schema already exists.", e);
connection.rollback();
}
String fullTableName = quoteTableOrSchemaName(schema)
+ "." + quoteTableOrSchemaName(name);
try {
// Try to remove the table first. DROP TABLE IF EXISTS didn't
// get added until pg 8.3, so we just use "DROP TABLE" and ignore
// any exception here if one occurs.
st.executeUpdate("DROP " + type + " " + fullTableName);
} catch (SQLException e) {
LOG.info("Couldn't drop "
+ type.toLowerCase()
+ " " +fullTableName
+ " (ok)",
e);
// Now we need to reset the transaction.
connection.rollback();
}
createIt.createIt(st, fullTableName, manager);
connection.commit();
} catch (SQLException sqlE) {
LOG.error("Encountered SQL Exception: " + sqlE);
sqlE.printStackTrace();
fail("SQLException when running test setUp(): " + sqlE);
} finally {
try {
if (null != st) {
st.close();
}
if (null != manager) {
manager.close();
}
} catch (SQLException sqlE) {
LOG.warn("Got SQLException when closing connection: " + sqlE);
}
}
LOG.debug("setUp complete.");
}
private String [] getArgv(boolean useTable,
String... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
if (useTable) {
args.add("--table");
args.add(TABLE_NAME);
} else {
args.add("--call");
args.add(PROCEDURE_NAME);
}
args.add("--export-dir");
args.add(getWarehouseDir());
args.add("--fields-terminated-by");
args.add(",");
args.add("--lines-terminated-by");
args.add("\\n");
args.add("--connect");
args.add(CONNECT_STRING);
args.add("--username");
args.add(DATABASE_USER);
args.add("--password");
args.add(PASSWORD);
args.add("-m");
args.add("1");
for (String arg : extraArgs) {
args.add(arg);
}
return args.toArray(new String[0]);
}
protected void createTestFile(String filename,
String[] lines)
throws IOException {
new File(getWarehouseDir()).mkdirs();
File file = new File(getWarehouseDir() + "/" + filename);
Writer output = new BufferedWriter(new FileWriter(file));
for(String line : lines) {
output.write(line);
output.write("\n");
}
output.close();
}
@Test
public void testExport() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
runExport(getArgv(true));
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}
@Test
public void testExportUsingProcedure() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
runExport(getArgv(false));
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}
@Test
public void testExportStaging() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
runExport(getArgv(true, extra));
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}
@Test
public void testExportDirect() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {"--direct"};
runExport(getArgv(true, extra));
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}
@Test
public void testExportCustomSchema() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {"--",
"--schema",
SCHEMA_SPECIAL,
};
runExport(getArgv(true, extra));
assertRowCount(2,
quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ "." + quoteTableOrSchemaName(TABLE_NAME),
connection);
}
@Test
public void testExportCustomSchemaStaging() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {
"--staging-table",
STAGING_TABLE_NAME,
"--",
"--schema",
SCHEMA_SPECIAL,
};
runExport(getArgv(true, extra));
assertRowCount(2,
quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ "." + quoteTableOrSchemaName(TABLE_NAME),
connection);
}
@Test
public void testExportCustomSchemaStagingClear()
throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {
"--staging-table",
STAGING_TABLE_NAME,
"--clear-staging-table",
"--",
"--schema",
SCHEMA_SPECIAL,
};
runExport(getArgv(true, extra));
assertRowCount(2,
quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ "." + quoteTableOrSchemaName(TABLE_NAME),
connection);
}
@Test
public void testExportCustomSchemaDirect() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});
String[] extra = new String[] {
"--direct",
"--",
"--schema",
SCHEMA_SPECIAL,
};
runExport(getArgv(true, extra));
assertRowCount(2,
quoteTableOrSchemaName(SCHEMA_SPECIAL)
+ "." + quoteTableOrSchemaName(TABLE_NAME),
connection);
}
public static void assertRowCount(long expected,
String tableName,
Connection connection) {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
rs.next();
assertEquals(expected, rs.getLong(1));
} catch (SQLException e) {
LOG.error("Can't verify number of rows", e);
fail();
} finally {
try {
connection.commit();
if (stmt != null) {
stmt.close();
}
if (rs != null) {
rs.close();
}
} catch (SQLException ex) {
LOG.info("Ignored exception in finally block.");
}
}
}
public String quoteTableOrSchemaName(String tableName) {
return "\"" + tableName + "\"";
}
}