blob: 7ff68ce015d8db0a9b3b9a627ad75e94e2bf51c2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import static org.apache.sqoop.manager.JdbcDrivers.DB2;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.ParseException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.db.Db2DataDrivenDBInputFormat;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.ExportBatchOutputFormat;
import org.apache.sqoop.mapreduce.JdbcExportJob;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
import org.apache.sqoop.util.LoggingUtils;
/**
* Manages connections to DB2 databases. Requires the DB2 JDBC driver.
*/
public class Db2Manager
extends org.apache.sqoop.manager.GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(
Db2Manager.class.getName());
private static final String XML_TO_JAVA_DATA_TYPE = "String";
private Map<String, String> columnTypeNames;
public static final String SCHEMA = "schema";
/**
* Query to list all tables visible to the current user. Note that this list
* does not identify the table owners which is required in order to ensure
* that the table can be operated on for import/export purposes.
*/
public static final String QUERY_LIST_SCHEMA_TABLES = "SELECT DISTINCT NAME FROM SYSIBM.SYSTABLES WHERE CREATOR =? AND TYPE='T' ";
/**
* Query to get the current user's schema for the DB session. Used in case of
* wallet logins.
*/
public static final String QUERY_GET_USERSCHEMA =
"select current schema from sysibm.sysdummy1";
/**
* DB2 schema that we should use.
*/
private String schema = null;
public Db2Manager(final SqoopOptions opts) {
super(DB2.getDriverClass(), opts);
// Try to parse extra arguments
try {
this.schema = parseExtraScheArgs(opts.getExtraArgs(),getExtraOptions());
} catch (ParseException e) {
throw new RuntimeException("Can't parse extra arguments", e);
}
}
/**
* Perform an import of a table from the database into HDFS.
*/
@Override
public void importTable(
org.apache.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
context.setConnManager(this);
// Specify the DB2-specific DBInputFormat for import.
context.setInputFormat(Db2DataDrivenDBInputFormat.class);
super.importTable(context);
}
/**
* Export data stored in HDFS into a table in a database.
*/
@Override
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
ExportBatchOutputFormat.class);
exportJob.runExport();
}
/**
* DB2 does not support the CURRENT_TIMESTAMP() function. Instead
* it uses the sysibm schema for timestamp lookup.
*/
@Override
public String getCurTimestampQuery() {
return "SELECT CURRENT TIMESTAMP FROM SYSIBM.SYSDUMMY1 WITH UR";
}
@Override
public String[] listDatabases() {
Connection conn = null;
ResultSet rset = null;
List<String> databases = new ArrayList<String>();
try {
conn = getConnection();
rset = conn.getMetaData().getSchemas();
while (rset.next()) {
// The ResultSet contains two columns - TABLE_SCHEM(1),
// TABLE_CATALOG(2). We are only interested in TABLE_SCHEM which
// represents schema name.
databases.add(rset.getString(1));
}
} catch (SQLException sqle) {
LoggingUtils.logAll(LOG, "Failed to list databases", sqle);
throw new RuntimeException(sqle);
} finally {
if (rset != null) {
try {
rset.close();
} catch (SQLException re) {
LoggingUtils.logAll(LOG, "Failed to close resultset", re);
}
}
}
return databases.toArray(new String[databases.size()]);
}
public static String getUserSchema(Connection conn) {
Statement stmt = null;
ResultSet rset = null;
String currSchema = null;
try {
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
rset = stmt.executeQuery(QUERY_GET_USERSCHEMA);
if (rset.next()) {
currSchema = rset.getString(1);
}
} catch (SQLException e) {
LoggingUtils.logAll(LOG, "Failed to get user schema", e);
} finally {
if (rset != null) {
try {
rset.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close resultset", ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close statement", ex);
}
}
}
if (currSchema == null) {
throw new RuntimeException("Unable to get current user schema");
}
return currSchema;
}
@Override
public String[] listTables() {
Connection conn = null;
PreparedStatement pStmt = null;
ResultSet rset = null;
List<String> tables = new ArrayList<String>();
String currUserSchema = null;
try {
conn = getConnection();
currUserSchema = getUserSchema(conn);
pStmt = conn.prepareStatement(QUERY_LIST_SCHEMA_TABLES,
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
// if user don't provide schema in CLI
if (schema == null) {
pStmt.setString(1, currUserSchema);
} else { //user provide a schema
pStmt.setString(1, schema);
}
rset = pStmt.executeQuery();
if (schema != null && rset == null) {
LOG.debug("schema=" + schema
+ ",maybe not exists in current database");
}
while (rset.next()) {
if(schema == null){
tables.add(rset.getString(1));
}else{
tables.add(schema + "." + rset.getString(1));
}
}
} catch (SQLException e) {
LoggingUtils.logAll(LOG, "Failed to list tables", e);
} finally {
if (rset != null) {
try {
rset.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close resultset", ex);
}
}
if (pStmt != null) {
try {
pStmt.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Failed to close statement", ex);
}
}
try {
close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to discard connection", ex);
}
}
return tables.toArray(new String[tables.size()]);
}
/**
* Return hive type for SQL type.
*
* @param tableName
* table name
* @param columnName
* column name
* @param sqlType
* sql data type
* @return hive type
*/
@Override
public String toHiveType(String tableName, String columnName, int sqlType) {
String hiveType = super.toHiveType(tableName, columnName, sqlType);
if (hiveType == null) {
hiveType = toDbSpecificHiveType(tableName, columnName);
}
return hiveType;
}
/**
* Resolve a database-specific type to the Hive type that should contain it.
*
* @param tableName
* table name
* @param colName
* column name
* @return the name of a Hive type to hold the sql datatype, or null if
* none.
*/
private String toDbSpecificHiveType(String tableName, String colName) {
if (columnTypeNames == null) {
columnTypeNames = getColumnTypeNames(tableName, options.getCall(),
options.getSqlQuery());
}
LOG.debug("database-specific Column Types and names returned = ("
+ StringUtils.join(columnTypeNames.keySet(), ",") + ")=>("
+ StringUtils.join(columnTypeNames.values(), ",") + ")");
String colTypeName = columnTypeNames.get(colName);
if (colTypeName != null) {
if (colTypeName.toUpperCase().startsWith("XML")) {
return XML_TO_JAVA_DATA_TYPE;
}
}
return null;
}
/**
* Return java type for SQL type.
*
* @param tableName
* table name
* @param columnName
* column name
* @param sqlType
* sql type
* @return java type
*/
@Override
public String toJavaType(String tableName, String columnName, int sqlType) {
String javaType = super.toJavaType(tableName, columnName, sqlType);
if (javaType == null) {
javaType = toDbSpecificJavaType(tableName, columnName);
}
return javaType;
}
/**
* Resolve a database-specific type to the Java type that should contain it.
*
* @param tableName
* table name
* @param colName
* column name
* @return the name of a Java type to hold the sql datatype, or null if
* none.
*/
private String toDbSpecificJavaType(String tableName, String colName) {
if (columnTypeNames == null) {
columnTypeNames = getColumnTypeNames(tableName, options.getCall(),
options.getSqlQuery());
}
String colTypeName = columnTypeNames.get(colName);
if (colTypeName != null) {
if (colTypeName.equalsIgnoreCase("XML")) {
return XML_TO_JAVA_DATA_TYPE;
}
}
return null;
}
/**
* Create related options for PostgreSQL extra parameters.
* @return
*/
@SuppressWarnings("static-access")
protected RelatedOptions getExtraOptions() {
// Connection args (common)
RelatedOptions extraOptions = new RelatedOptions("DB2 extra options:");
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
.withDescription("Optional schema name").withLongOpt(SCHEMA)
.create("schema"));
return extraOptions;
}
}