| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.sqoop.manager; |
| |
| import org.apache.hadoop.sqoop.SqoopOptions; |
| import org.apache.hadoop.sqoop.hive.HiveTypes; |
| import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob; |
| import org.apache.hadoop.sqoop.mapreduce.ExportJob; |
| import org.apache.hadoop.sqoop.util.ExportException; |
| import org.apache.hadoop.sqoop.util.ImportException; |
| import org.apache.hadoop.sqoop.util.ResultSetPrinter; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * ConnManager implementation for generic SQL-compliant database. |
| * This is an abstract class; it requires a database-specific |
| * ConnManager implementation to actually create the connection. |
| */ |
| public abstract class SqlManager extends ConnManager { |
| |
| public static final Log LOG = LogFactory.getLog(SqlManager.class.getName()); |
| |
| protected SqoopOptions options; |
| |
| /** |
| * Constructs the SqlManager |
| * @param opts |
| * @param specificMgr |
| */ |
| public SqlManager(final SqoopOptions opts) { |
| this.options = opts; |
| } |
| |
| /** |
| * @return the SQL query to use in getColumnNames() in case this logic must |
| * be tuned per-database, but the main extraction loop is still inheritable. |
| */ |
| protected String getColNamesQuery(String tableName) { |
| // adding where clause to prevent loading a big table |
| return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t WHERE 1=0"; |
| } |
| |
| @Override |
| public String[] getColumnNames(String tableName) { |
| String stmt = getColNamesQuery(tableName); |
| |
| ResultSet results; |
| try { |
| results = execute(stmt); |
| } catch (SQLException sqlE) { |
| LOG.error("Error executing statement: " + sqlE.toString()); |
| return null; |
| } |
| |
| try { |
| int cols = results.getMetaData().getColumnCount(); |
| ArrayList<String> columns = new ArrayList<String>(); |
| ResultSetMetaData metadata = results.getMetaData(); |
| for (int i = 1; i < cols + 1; i++) { |
| String colName = metadata.getColumnName(i); |
| if (colName == null || colName.equals("")) { |
| colName = metadata.getColumnLabel(i); |
| } |
| columns.add(colName); |
| } |
| return columns.toArray(new String[0]); |
| } catch (SQLException sqlException) { |
| LOG.error("Error reading from database: " + sqlException.toString()); |
| return null; |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); |
| } |
| } |
| } |
| |
| /** |
| * @return the SQL query to use in getColumnTypes() in case this logic must |
| * be tuned per-database, but the main extraction loop is still inheritable. |
| */ |
| protected String getColTypesQuery(String tableName) { |
| return getColNamesQuery(tableName); |
| } |
| |
| @Override |
| public Map<String, Integer> getColumnTypes(String tableName) { |
| String stmt = getColTypesQuery(tableName); |
| |
| ResultSet results; |
| try { |
| results = execute(stmt); |
| } catch (SQLException sqlE) { |
| LOG.error("Error executing statement: " + sqlE.toString()); |
| return null; |
| } |
| |
| try { |
| Map<String, Integer> colTypes = new HashMap<String, Integer>(); |
| |
| int cols = results.getMetaData().getColumnCount(); |
| ResultSetMetaData metadata = results.getMetaData(); |
| for (int i = 1; i < cols + 1; i++) { |
| int typeId = metadata.getColumnType(i); |
| String colName = metadata.getColumnName(i); |
| if (colName == null || colName.equals("")) { |
| colName = metadata.getColumnLabel(i); |
| } |
| |
| colTypes.put(colName, Integer.valueOf(typeId)); |
| } |
| |
| return colTypes; |
| } catch (SQLException sqlException) { |
| LOG.error("Error reading from database: " + sqlException.toString()); |
| return null; |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); |
| } |
| } |
| } |
| |
| @Override |
| public ResultSet readTable(String tableName, String[] columns) throws SQLException { |
| if (columns == null) { |
| columns = getColumnNames(tableName); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append("SELECT "); |
| boolean first = true; |
| for (String col : columns) { |
| if (!first) { |
| sb.append(", "); |
| } |
| sb.append(escapeColName(col)); |
| first = false; |
| } |
| sb.append(" FROM "); |
| sb.append(escapeTableName(tableName)); |
| sb.append(" AS "); // needed for hsqldb; doesn't hurt anyone else. |
| sb.append(escapeTableName(tableName)); |
| |
| return execute(sb.toString()); |
| } |
| |
| @Override |
| public String[] listDatabases() { |
| // TODO(aaron): Implement this! |
| LOG.error("Generic SqlManager.listDatabases() not implemented."); |
| return null; |
| } |
| |
| @Override |
| public String[] listTables() { |
| ResultSet results = null; |
| String [] tableTypes = {"TABLE"}; |
| try { |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| results = metaData.getTables(null, null, null, tableTypes); |
| } catch (SQLException sqlException) { |
| LOG.error("Error reading database metadata: " + sqlException.toString()); |
| return null; |
| } |
| |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| ArrayList<String> tables = new ArrayList<String>(); |
| while (results.next()) { |
| String tableName = results.getString("TABLE_NAME"); |
| tables.add(tableName); |
| } |
| |
| return tables.toArray(new String[0]); |
| } catch (SQLException sqlException) { |
| LOG.error("Error reading from database: " + sqlException.toString()); |
| return null; |
| } |
| } finally { |
| if (null != results) { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LOG.warn("Exception closing ResultSet: " + sqlE.toString()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String getPrimaryKey(String tableName) { |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| ResultSet results = metaData.getPrimaryKeys(null, null, tableName); |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| if (results.next()) { |
| return results.getString("COLUMN_NAME"); |
| } else { |
| return null; |
| } |
| } finally { |
| results.close(); |
| getConnection().commit(); |
| } |
| } catch (SQLException sqlException) { |
| LOG.error("Error reading primary key metadata: " + sqlException.toString()); |
| return null; |
| } |
| } |
| |
| /** |
| * Retrieve the actual connection from the outer ConnManager |
| */ |
| public abstract Connection getConnection() throws SQLException; |
| |
| /** |
| * Default implementation of importTable() is to launch a MapReduce job |
| * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat. |
| */ |
| public void importTable(ImportJobContext context) |
| throws IOException, ImportException { |
| String tableName = context.getTableName(); |
| String jarFile = context.getJarFile(); |
| SqoopOptions options = context.getOptions(); |
| DataDrivenImportJob importer = new DataDrivenImportJob(options); |
| String splitCol = options.getSplitByCol(); |
| if (null == splitCol) { |
| // If the user didn't specify a splitting column, try to infer one. |
| splitCol = getPrimaryKey(tableName); |
| } |
| |
| if (null == splitCol) { |
| // Can't infer a primary key. |
| throw new ImportException("No primary key could be found for table " + tableName |
| + ". Please specify one with --split-by."); |
| } |
| |
| importer.runImport(tableName, jarFile, splitCol, options.getConf()); |
| } |
| |
| /** |
| * executes an arbitrary SQL statement |
| * @param stmt The SQL statement to execute |
| * @return A ResultSet encapsulating the results or null on error |
| */ |
| protected ResultSet execute(String stmt, Object... args) throws SQLException { |
| PreparedStatement statement = null; |
| statement = this.getConnection().prepareStatement(stmt, |
| ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| if (null != args) { |
| for (int i = 0; i < args.length; i++) { |
| statement.setObject(i + 1, args[i]); |
| } |
| } |
| |
| LOG.info("Executing SQL statement: " + stmt); |
| return statement.executeQuery(); |
| } |
| |
| /** |
| * Resolve a database-specific type to the Java type that should contain it. |
| * @param sqlType |
| * @return the name of a Java type to hold the sql datatype, or null if none. |
| */ |
| public String toJavaType(int sqlType) { |
| // mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html |
| if (sqlType == Types.INTEGER) { |
| return "Integer"; |
| } else if (sqlType == Types.VARCHAR) { |
| return "String"; |
| } else if (sqlType == Types.CHAR) { |
| return "String"; |
| } else if (sqlType == Types.LONGVARCHAR) { |
| return "String"; |
| } else if (sqlType == Types.NUMERIC) { |
| return "java.math.BigDecimal"; |
| } else if (sqlType == Types.DECIMAL) { |
| return "java.math.BigDecimal"; |
| } else if (sqlType == Types.BIT) { |
| return "Boolean"; |
| } else if (sqlType == Types.BOOLEAN) { |
| return "Boolean"; |
| } else if (sqlType == Types.TINYINT) { |
| return "Integer"; |
| } else if (sqlType == Types.SMALLINT) { |
| return "Integer"; |
| } else if (sqlType == Types.BIGINT) { |
| return "Long"; |
| } else if (sqlType == Types.REAL) { |
| return "Float"; |
| } else if (sqlType == Types.FLOAT) { |
| return "Double"; |
| } else if (sqlType == Types.DOUBLE) { |
| return "Double"; |
| } else if (sqlType == Types.DATE) { |
| return "java.sql.Date"; |
| } else if (sqlType == Types.TIME) { |
| return "java.sql.Time"; |
| } else if (sqlType == Types.TIMESTAMP) { |
| return "java.sql.Timestamp"; |
| } else { |
| // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY, |
| // STRUCT, REF, JAVA_OBJECT. |
| // return database specific java data type |
| return null; |
| } |
| } |
| |
| /** |
| * Resolve a database-specific type to Hive data type |
| * @param sqlType sql type |
| * @return hive type |
| */ |
| public String toHiveType(int sqlType) { |
| return HiveTypes.toHiveType(sqlType); |
| } |
| |
| public void close() throws SQLException { |
| } |
| |
| /** |
| * Poor man's SQL query interface; used for debugging. |
| * @param s |
| */ |
| public void execAndPrint(String s) { |
| System.out.println("Executing statement: " + s); |
| ResultSet results; |
| try { |
| results = execute(s); |
| } catch (SQLException sqlE) { |
| LOG.error("Error executing statement: " + sqlE.toString()); |
| return; |
| } |
| |
| try { |
| try { |
| int cols = results.getMetaData().getColumnCount(); |
| System.out.println("Got " + cols + " columns back"); |
| if (cols > 0) { |
| System.out.println("Schema: " + results.getMetaData().getSchemaName(1)); |
| System.out.println("Table: " + results.getMetaData().getTableName(1)); |
| } |
| } catch (SQLException sqlE) { |
| LOG.error("SQLException reading result metadata: " + sqlE.toString()); |
| } |
| |
| try { |
| new ResultSetPrinter().printResultSet(System.out, results); |
| } catch (IOException ioe) { |
| LOG.error("IOException writing results to stdout: " + ioe.toString()); |
| return; |
| } |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); |
| } |
| } |
| } |
| |
| /** |
| * Create a connection to the database; usually used only from within |
| * getConnection(), which enforces a singleton guarantee around the |
| * Connection object. |
| */ |
| protected Connection makeConnection() throws SQLException { |
| |
| Connection connection; |
| String driverClass = getDriverClass(); |
| |
| try { |
| Class.forName(driverClass); |
| } catch (ClassNotFoundException cnfe) { |
| throw new RuntimeException("Could not load db driver class: " + driverClass); |
| } |
| |
| String username = options.getUsername(); |
| String password = options.getPassword(); |
| if (null == username) { |
| connection = DriverManager.getConnection(options.getConnectString()); |
| } else { |
| connection = DriverManager.getConnection(options.getConnectString(), username, password); |
| } |
| |
| // We only use this for metadata queries. Loosest semantics are okay. |
| connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); |
| connection.setAutoCommit(false); |
| |
| return connection; |
| } |
| |
| /** |
| * Export data stored in HDFS into a table in a database |
| */ |
| public void exportTable(ExportJobContext context) |
| throws IOException, ExportException { |
| ExportJob exportJob = new ExportJob(context); |
| exportJob.runExport(); |
| } |
| } |