| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.manager; |
| |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Timestamp; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.TreeMap; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.sqoop.accumulo.AccumuloUtil; |
| import org.apache.sqoop.mapreduce.AccumuloImportJob; |
| import org.apache.sqoop.mapreduce.HBaseBulkImportJob; |
| import org.apache.sqoop.mapreduce.JdbcCallExportJob; |
| import org.apache.sqoop.util.LoggingUtils; |
| import org.apache.sqoop.util.SqlTypeMap; |
| |
| import org.apache.sqoop.SqoopOptions; |
| import org.apache.sqoop.hbase.HBaseUtil; |
| import org.apache.sqoop.mapreduce.DataDrivenImportJob; |
| import org.apache.sqoop.mapreduce.HBaseImportJob; |
| import org.apache.sqoop.mapreduce.ImportJobBase; |
| import org.apache.sqoop.mapreduce.JdbcExportJob; |
| import org.apache.sqoop.mapreduce.JdbcUpdateExportJob; |
| import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat; |
| import org.apache.sqoop.util.ExportException; |
| import org.apache.sqoop.util.ImportException; |
| import org.apache.sqoop.util.ResultSetPrinter; |
| |
| /** |
| * ConnManager implementation for generic SQL-compliant database. |
| * This is an abstract class; it requires a database-specific |
| * ConnManager implementation to actually create the connection. |
| */ |
| public abstract class SqlManager |
| extends org.apache.sqoop.manager.ConnManager { |
| |
| public static final Log LOG = LogFactory.getLog(SqlManager.class.getName()); |
| |
| /** Substring that must appear in free-form queries submitted by users. |
| * This is the string '$CONDITIONS'. |
| */ |
| public static final String SUBSTITUTE_TOKEN = |
| DataDrivenDBInputFormat.SUBSTITUTE_TOKEN; |
| |
| protected static final int DEFAULT_FETCH_SIZE = 1000; |
| |
| private Statement lastStatement; |
| |
| /** |
| * Constructs the SqlManager. |
| * @param opts the SqoopOptions describing the user's requested action. |
| */ |
| public SqlManager(final SqoopOptions opts) { |
| this.options = opts; |
| initOptionDefaults(); |
| } |
| |
| /** |
| * Sets default values for values that were not provided by the user. |
| * Only options with database-specific defaults should be configured here. |
| */ |
| protected void initOptionDefaults() { |
| if (options.getFetchSize() == null) { |
| LOG.info("Using default fetchSize of " + DEFAULT_FETCH_SIZE); |
| options.setFetchSize(DEFAULT_FETCH_SIZE); |
| } |
| } |
| |
| /** |
| * @return the SQL query to use in getColumnNames() in case this logic must |
| * be tuned per-database, but the main extraction loop is still inheritable. |
| */ |
| protected String getColNamesQuery(String tableName) { |
| // adding where clause to prevent loading a big table |
| return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t WHERE 1=0"; |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public String[] getColumnNames(String tableName) { |
| String stmt = getColNamesQuery(tableName); |
| return filterSpecifiedColumnNames(getColumnNamesForRawQuery(stmt)); |
| } |
| |
| /** |
| * Utilize the --columns option, if specified. |
| * @param columns |
| * @return the subset of columns which were specified by --columns option. |
| */ |
| protected String[] filterSpecifiedColumnNames(String[] columns) { |
| if (options.getColumns() == null) { |
| return columns; |
| } |
| List<String> colNames = new ArrayList<String>(); |
| for (String col : columns) { |
| String userColName = options.getColumnNameCaseInsensitive(col); |
| if (userColName != null) { |
| colNames.add(userColName); |
| } |
| } |
| return colNames.toArray(new String[colNames.size()]); |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public String [] getColumnNamesForQuery(String query) { |
| String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); |
| return getColumnNamesForRawQuery(rawQuery); |
| } |
| |
| /** |
| * Get column names for a query statement that we do not modify further. |
| */ |
| public String[] getColumnNamesForRawQuery(String stmt) { |
| ResultSet results; |
| try { |
| results = execute(stmt); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "Error executing statement: " + sqlE.toString(), |
| sqlE); |
| release(); |
| return null; |
| } |
| |
| try { |
| int cols = results.getMetaData().getColumnCount(); |
| ArrayList<String> columns = new ArrayList<String>(); |
| ResultSetMetaData metadata = results.getMetaData(); |
| for (int i = 1; i < cols + 1; i++) { |
| String colName = metadata.getColumnLabel(i); |
| if (colName == null || colName.equals("")) { |
| colName = metadata.getColumnName(i); |
| if (null == colName) { |
| colName = "_RESULT_" + i; |
| } |
| } |
| columns.add(colName); |
| LOG.debug("Found column " + colName); |
| } |
| return columns.toArray(new String[0]); |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading from database: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQLException closing ResultSet: " |
| + sqlE.toString(), sqlE); |
| } |
| |
| release(); |
| } |
| } |
| |
| @Override |
| public String[] getColumnNamesForProcedure(String procedureName) { |
| List<String> ret = new ArrayList<String>(); |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| ResultSet results = metaData.getProcedureColumns(null, null, |
| procedureName, null); |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| while (results.next()) { |
| if (results.getInt("COLUMN_TYPE") |
| != DatabaseMetaData.procedureColumnReturn) { |
| int index = results.getInt("ORDINAL_POSITION") - 1; |
| if (index < 0) { |
| continue; // actually the return type |
| } |
| for(int i = ret.size(); i < index; ++i) { |
| ret.add(null); |
| } |
| String name = results.getString("COLUMN_NAME"); |
| if (index == ret.size()) { |
| ret.add(name); |
| } else { |
| ret.set(index, name); |
| } |
| } |
| } |
| LOG.debug("getColumnsNamesForProcedure returns " |
| + StringUtils.join(ret, ",")); |
| return ret.toArray(new String[ret.size()]); |
| } finally { |
| results.close(); |
| getConnection().commit(); |
| } |
| } catch (SQLException e) { |
| LoggingUtils.logAll(LOG, "Error reading procedure metadata: ", e); |
| throw new RuntimeException("Can't fetch column names for procedure.", e); |
| } |
| } |
| |
| /** |
| * @return the SQL query to use in getColumnTypes() in case this logic must |
| * be tuned per-database, but the main extraction loop is still inheritable. |
| */ |
| protected String getColTypesQuery(String tableName) { |
| return getColNamesQuery(tableName); |
| } |
| |
| @Override |
| public Map<String, Integer> getColumnTypes(String tableName) { |
| String stmt = getColTypesQuery(tableName); |
| return getColumnTypesForRawQuery(stmt); |
| } |
| |
| @Override |
| public Map<String, Integer> getColumnTypesForQuery(String query) { |
| // Manipulate the query to return immediately, with zero rows. |
| String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); |
| return getColumnTypesForRawQuery(rawQuery); |
| } |
| |
| /** |
| * Get column types for a query statement that we do not modify further. |
| */ |
| protected Map<String, Integer> getColumnTypesForRawQuery(String stmt) { |
| Map<String, List<Integer>> colInfo = getColumnInfoForRawQuery(stmt); |
| if (colInfo == null) { |
| return null; |
| } |
| Map<String, Integer> colTypes = new SqlTypeMap<String, Integer>(); |
| for (String s : colInfo.keySet()) { |
| List<Integer> info = colInfo.get(s); |
| colTypes.put(s, info.get(0)); |
| } |
| return colTypes; |
| } |
| |
| @Override |
| public Map<String, List<Integer>> getColumnInfo(String tableName) { |
| String stmt = getColNamesQuery(tableName); |
| return getColumnInfoForRawQuery(stmt); |
| } |
| |
| @Override |
| public Map<String, List<Integer>> getColumnInfoForQuery(String query) { |
| // Manipulate the query to return immediately, with zero rows. |
| String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); |
| return getColumnInfoForRawQuery(rawQuery); |
| } |
| |
| protected Map<String, List<Integer>> getColumnInfoForRawQuery(String stmt) { |
| ResultSet results; |
| LOG.debug("Execute getColumnInfoRawQuery : " + stmt); |
| try { |
| results = execute(stmt); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "Error executing statement: " + sqlE.toString(), |
| sqlE); |
| release(); |
| return null; |
| } |
| |
| try { |
| Map<String, List<Integer>> colInfo = |
| new SqlTypeMap<String, List<Integer>>(); |
| |
| int cols = results.getMetaData().getColumnCount(); |
| ResultSetMetaData metadata = results.getMetaData(); |
| for (int i = 1; i < cols + 1; i++) { |
| int typeId = metadata.getColumnType(i); |
| int precision = metadata.getPrecision(i); |
| int scale = metadata.getScale(i); |
| |
| // If we have an unsigned int we need to make extra room by |
| // plopping it into a bigint |
| if (typeId == Types.INTEGER && !metadata.isSigned(i)){ |
| typeId = Types.BIGINT; |
| } |
| |
| String colName = metadata.getColumnLabel(i); |
| if (colName == null || colName.equals("")) { |
| colName = metadata.getColumnName(i); |
| } |
| List<Integer> info = new ArrayList<Integer>(3); |
| info.add(Integer.valueOf(typeId)); |
| info.add(precision); |
| info.add(scale); |
| colInfo.put(colName, info); |
| LOG.debug("Found column " + colName + " of type " + info); |
| } |
| |
| return colInfo; |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading from database: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, |
| "SQLException closing ResultSet: " + sqlE.toString(), sqlE); |
| } |
| |
| release(); |
| } |
| } |
| |
| @Override |
| public Map<String, String> getColumnTypeNamesForTable(String tableName) { |
| String stmt = getColTypesQuery(tableName); |
| return getColumnTypeNamesForRawQuery(stmt); |
| } |
| |
| @Override |
| public Map<String, String> getColumnTypeNamesForQuery(String query) { |
| // Manipulate the query to return immediately, with zero rows. |
| String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) "); |
| return getColumnTypeNamesForRawQuery(rawQuery); |
| } |
| |
| protected Map<String, String> getColumnTypeNamesForRawQuery(String stmt) { |
| ResultSet results; |
| try { |
| results = execute(stmt); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "Error executing statement: " + sqlE.toString(), |
| sqlE); |
| release(); |
| return null; |
| } |
| |
| try { |
| Map<String, String> colTypeNames = new HashMap<String, String>(); |
| |
| int cols = results.getMetaData().getColumnCount(); |
| ResultSetMetaData metadata = results.getMetaData(); |
| for (int i = 1; i < cols + 1; i++) { |
| String colTypeName = metadata.getColumnTypeName(i); |
| |
| String colName = metadata.getColumnLabel(i); |
| if (colName == null || colName.equals("")) { |
| colName = metadata.getColumnName(i); |
| } |
| |
| colTypeNames.put(colName, colTypeName); |
| LOG.debug("Found column " + colName + " of type " + colTypeName); |
| } |
| |
| return colTypeNames; |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading from database: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQLException closing ResultSet: " |
| + sqlE.toString(), sqlE); |
| } |
| |
| release(); |
| } |
| } |
| |
| @Override |
| public ResultSet readTable(String tableName, String[] columns) |
| throws SQLException { |
| if (columns == null) { |
| columns = getColumnNames(tableName); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append("SELECT "); |
| boolean first = true; |
| for (String col : columns) { |
| if (!first) { |
| sb.append(", "); |
| } |
| sb.append(escapeColName(col)); |
| first = false; |
| } |
| sb.append(" FROM "); |
| sb.append(escapeTableName(tableName)); |
| sb.append(" AS "); // needed for hsqldb; doesn't hurt anyone else. |
| sb.append(escapeTableName(tableName)); |
| |
| String sqlCmd = sb.toString(); |
| LOG.debug("Reading table with command: " + sqlCmd); |
| return execute(sqlCmd); |
| } |
| |
| @Override |
| public String[] listDatabases() { |
| // TODO(aaron): Implement this! |
| LOG.error("Generic SqlManager.listDatabases() not implemented."); |
| return null; |
| } |
| |
| @Override |
| public Map<String, Integer> getColumnTypesForProcedure(String procedureName) { |
| Map<String, List<Integer>> colInfo = |
| getColumnInfoForProcedure(procedureName); |
| if (colInfo == null) { |
| return null; |
| } |
| Map<String, Integer> colTypes = new SqlTypeMap<String, Integer>(); |
| for (String s : colInfo.keySet()) { |
| List<Integer> info = colInfo.get(s); |
| colTypes.put(s, info.get(0)); |
| } |
| return colTypes; |
| } |
| |
| @Override |
| public Map<String, List<Integer>> |
| getColumnInfoForProcedure(String procedureName) { |
| Map<String, List<Integer>> ret = new TreeMap<String, List<Integer>>(); |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| ResultSet results = metaData.getProcedureColumns(null, null, |
| procedureName, null); |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| while (results.next()) { |
| if (results.getInt("COLUMN_TYPE") |
| != DatabaseMetaData.procedureColumnReturn |
| && results.getInt("ORDINAL_POSITION") > 0) { |
| // we don't care if we get several rows for the |
| // same ORDINAL_POSITION (e.g. like H2 gives us) |
| // as we'll just overwrite the entry in the map: |
| List<Integer> info = new ArrayList<Integer>(3); |
| info.add(results.getInt("DATA_TYPE")); |
| info.add(results.getInt("PRECISION")); |
| info.add(results.getInt("SCALE")); |
| ret.put(results.getString("COLUMN_NAME"), info); |
| } |
| } |
| LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); |
| LOG.debug("Types returned = " + StringUtils.join(ret.values(), ",")); |
| return ret.isEmpty() ? null : ret; |
| } finally { |
| results.close(); |
| getConnection().commit(); |
| } |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading primary key metadata: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } |
| } |
| |
| @Override |
| public Map<String, String> |
| getColumnTypeNamesForProcedure(String procedureName) { |
| Map<String, String> ret = new TreeMap<String, String>(); |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| ResultSet results = metaData.getProcedureColumns(null, null, |
| procedureName, null); |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| while (results.next()) { |
| if (results.getInt("COLUMN_TYPE") |
| != DatabaseMetaData.procedureColumnReturn |
| && results.getInt("ORDINAL_POSITION") > 0) { |
| // we don't care if we get several rows for the |
| // same ORDINAL_POSITION (e.g. like H2 gives us) |
| // as we'll just overwrite the entry in the map: |
| ret.put( |
| results.getString("COLUMN_NAME"), |
| results.getString("TYPE_NAME")); |
| } |
| } |
| LOG.debug("Columns returned = " + StringUtils.join(ret.keySet(), ",")); |
| LOG.debug( |
| "Type names returned = " + StringUtils.join(ret.values(), ",")); |
| return ret.isEmpty() ? null : ret; |
| } finally { |
| results.close(); |
| getConnection().commit(); |
| } |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading primary key metadata: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } |
| } |
| |
| @Override |
| public String[] listTables() { |
| ResultSet results = null; |
| String [] tableTypes = {"TABLE"}; |
| try { |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| results = metaData.getTables(null, null, null, tableTypes); |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading database metadata: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } |
| |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| ArrayList<String> tables = new ArrayList<String>(); |
| while (results.next()) { |
| String tableName = results.getString("TABLE_NAME"); |
| tables.add(tableName); |
| } |
| |
| return tables.toArray(new String[0]); |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading from database: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } |
| } finally { |
| if (null != results) { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "Exception closing ResultSet: " |
| + sqlE.toString(), sqlE); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String getPrimaryKey(String tableName) { |
| try { |
| DatabaseMetaData metaData = this.getConnection().getMetaData(); |
| ResultSet results = metaData.getPrimaryKeys(null, null, tableName); |
| if (null == results) { |
| return null; |
| } |
| |
| try { |
| if (results.next()) { |
| return results.getString("COLUMN_NAME"); |
| } else { |
| return null; |
| } |
| } finally { |
| results.close(); |
| getConnection().commit(); |
| } |
| } catch (SQLException sqlException) { |
| LoggingUtils.logAll(LOG, "Error reading primary key metadata: " |
| + sqlException.toString(), sqlException); |
| return null; |
| } |
| } |
| |
| /** |
| * Retrieve the actual connection from the outer ConnManager. |
| */ |
| public abstract Connection getConnection() throws SQLException; |
| |
| /** |
| * Determine what column to use to split the table. |
| * @param opts the SqoopOptions controlling this import. |
| * @param tableName the table to import. |
| * @return the splitting column, if one is set or inferrable, or null |
| * otherwise. |
| */ |
| protected String getSplitColumn(SqoopOptions opts, String tableName) { |
| String splitCol = opts.getSplitByCol(); |
| if (null == splitCol && null != tableName) { |
| // If the user didn't specify a splitting column, try to infer one. |
| splitCol = getPrimaryKey(tableName); |
| } |
| |
| return splitCol; |
| } |
| |
| /** |
| * Offers the ConnManager an opportunity to validate that the |
| * options specified in the ImportJobContext are valid. |
| * @throws ImportException if the import is misconfigured. |
| */ |
| protected void checkTableImportOptions( |
| org.apache.sqoop.manager.ImportJobContext context) |
| throws IOException, ImportException { |
| String tableName = context.getTableName(); |
| SqoopOptions opts = context.getOptions(); |
| |
| // Default implementation: check that the split column is set |
| // correctly. |
| String splitCol = getSplitColumn(opts, tableName); |
| if (null == splitCol && opts.getNumMappers() > 1) { |
| if (!opts.getAutoResetToOneMapper()) { |
| // Can't infer a primary key. |
| throw new ImportException("No primary key could be found for table " |
| + tableName + ". Please specify one with --split-by or perform " |
| + "a sequential import with '-m 1'."); |
| } else { |
| LOG.warn("Split by column not provided or can't be inferred. Resetting to one mapper"); |
| opts.setNumMappers(1); |
| } |
| } |
| } |
| |
| /** |
| * Default implementation of importTable() is to launch a MapReduce job |
| * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat. |
| */ |
| public void importTable(org.apache.sqoop.manager.ImportJobContext context) |
| throws IOException, ImportException { |
| String tableName = context.getTableName(); |
| String jarFile = context.getJarFile(); |
| SqoopOptions opts = context.getOptions(); |
| |
| context.setConnManager(this); |
| |
| ImportJobBase importer; |
| if (opts.getHBaseTable() != null) { |
| // Import to HBase. |
| if (!HBaseUtil.isHBaseJarPresent()) { |
| throw new ImportException("HBase jars are not present in " |
| + "classpath, cannot import to HBase!"); |
| } |
| if (!opts.isBulkLoadEnabled()){ |
| importer = new HBaseImportJob(opts, context); |
| } else { |
| importer = new HBaseBulkImportJob(opts, context); |
| } |
| } else if (opts.getAccumuloTable() != null) { |
| // Import to Accumulo. |
| if (!AccumuloUtil.isAccumuloJarPresent()) { |
| throw new ImportException("Accumulo jars are not present in " |
| + "classpath, cannot import to Accumulo!"); |
| } |
| importer = new AccumuloImportJob(opts, context); |
| } else { |
| // Import to HDFS. |
| importer = new DataDrivenImportJob(opts, context.getInputFormat(), |
| context); |
| } |
| |
| checkTableImportOptions(context); |
| |
| String splitCol = getSplitColumn(opts, tableName); |
| importer.runImport(tableName, jarFile, splitCol, opts.getConf()); |
| } |
| |
| /** |
| * Default implementation of importQuery() is to launch a MapReduce job |
| * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat, |
| * using its free-form query importer. |
| */ |
| public void importQuery(org.apache.sqoop.manager.ImportJobContext context) |
| throws IOException, ImportException { |
| String jarFile = context.getJarFile(); |
| SqoopOptions opts = context.getOptions(); |
| |
| context.setConnManager(this); |
| |
| ImportJobBase importer; |
| if (opts.getHBaseTable() != null) { |
| // Import to HBase. |
| if (!HBaseUtil.isHBaseJarPresent()) { |
| throw new ImportException("HBase jars are not present in classpath," |
| + " cannot import to HBase!"); |
| } |
| if (!opts.isBulkLoadEnabled()){ |
| importer = new HBaseImportJob(opts, context); |
| } else { |
| importer = new HBaseBulkImportJob(opts, context); |
| } |
| } else if (opts.getAccumuloTable() != null) { |
| // Import to Accumulo. |
| if (!AccumuloUtil.isAccumuloJarPresent()) { |
| throw new ImportException("Accumulo jars are not present in classpath," |
| + " cannot import to Accumulo!"); |
| } |
| importer = new AccumuloImportJob(opts, context); |
| } else { |
| // Import to HDFS. |
| importer = new DataDrivenImportJob(opts, context.getInputFormat(), |
| context); |
| } |
| |
| String splitCol = getSplitColumn(opts, null); |
| if (splitCol == null) { |
| String boundaryQuery = opts.getBoundaryQuery(); |
| if (opts.getNumMappers() > 1) { |
| // Can't infer a primary key. |
| throw new ImportException("A split-by column must be specified for " |
| + "parallel free-form query imports. Please specify one with " |
| + "--split-by or perform a sequential import with '-m 1'."); |
| } else if (boundaryQuery != null && !boundaryQuery.isEmpty()) { |
| // Query import with boundary query and no split column specified |
| throw new ImportException("Using a boundary query for a query based " |
| + "import requires specifying the split by column as well. Please " |
| + "specify a column name using --split-by and try again."); |
| } |
| } |
| |
| importer.runImport(null, jarFile, splitCol, opts.getConf()); |
| } |
| |
| /** |
| * Executes an arbitrary SQL statement. |
| * @param stmt The SQL statement to execute |
| * @param fetchSize Overrides default or parameterized fetch size |
| * @return A ResultSet encapsulating the results or null on error |
| */ |
| protected ResultSet execute(String stmt, Integer fetchSize, Object... args) |
| throws SQLException { |
| // Release any previously-open statement. |
| release(); |
| |
| PreparedStatement statement = null; |
| statement = this.getConnection().prepareStatement(stmt, |
| ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| if (fetchSize != null) { |
| LOG.debug("Using fetchSize for next query: " + fetchSize); |
| statement.setFetchSize(fetchSize); |
| } |
| this.lastStatement = statement; |
| if (null != args) { |
| for (int i = 0; i < args.length; i++) { |
| statement.setObject(i + 1, args[i]); |
| } |
| } |
| |
| LOG.info("Executing SQL statement: " + stmt); |
| return statement.executeQuery(); |
| } |
| |
| /** |
| * Executes an arbitrary SQL Statement. |
| * @param stmt The SQL statement to execute |
| * @return A ResultSet encapsulating the results or null on error |
| */ |
| protected ResultSet execute(String stmt, Object... args) throws SQLException { |
| return execute(stmt, options.getFetchSize(), args); |
| } |
| |
| public void close() throws SQLException { |
| release(); |
| } |
| |
| /** |
| * Prints the contents of a ResultSet to the specified PrintWriter. |
| * The ResultSet is closed at the end of this method. |
| * @param results the ResultSet to print. |
| * @param pw the location to print the data to. |
| */ |
| protected void formatAndPrintResultSet(ResultSet results, PrintWriter pw) { |
| try { |
| try { |
| int cols = results.getMetaData().getColumnCount(); |
| pw.println("Got " + cols + " columns back"); |
| if (cols > 0) { |
| ResultSetMetaData rsmd = results.getMetaData(); |
| String schema = rsmd.getSchemaName(1); |
| String table = rsmd.getTableName(1); |
| if (null != schema) { |
| pw.println("Schema: " + schema); |
| } |
| |
| if (null != table) { |
| pw.println("Table: " + table); |
| } |
| } |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQLException reading result metadata: " |
| + sqlE.toString(), sqlE); |
| } |
| |
| try { |
| new ResultSetPrinter().printResultSet(pw, results); |
| } catch (IOException ioe) { |
| LOG.error("IOException writing results: " + ioe.toString()); |
| return; |
| } |
| } finally { |
| try { |
| results.close(); |
| getConnection().commit(); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQLException closing ResultSet: " |
| + sqlE.toString(), sqlE); |
| } |
| |
| release(); |
| } |
| } |
| |
| /** |
| * Poor man's SQL query interface; used for debugging. |
| * @param s the SQL statement to execute. |
| */ |
| public void execAndPrint(String s) { |
| ResultSet results = null; |
| try { |
| results = execute(s); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "Error executing statement: ", sqlE); |
| release(); |
| return; |
| } |
| |
| PrintWriter pw = new PrintWriter(System.out, true); |
| try { |
| formatAndPrintResultSet(results, pw); |
| } finally { |
| pw.close(); |
| } |
| } |
| |
| /** |
| * Create a connection to the database; usually used only from within |
| * getConnection(), which enforces a singleton guarantee around the |
| * Connection object. |
| */ |
| protected Connection makeConnection() throws SQLException { |
| |
| Connection connection; |
| String driverClass = getDriverClass(); |
| |
| try { |
| Class.forName(driverClass); |
| } catch (ClassNotFoundException cnfe) { |
| throw new RuntimeException("Could not load db driver class: " |
| + driverClass); |
| } |
| |
| String username = options.getUsername(); |
| String password = options.getPassword(); |
| String connectString = options.getConnectString(); |
| Properties connectionParams = options.getConnectionParams(); |
| if (connectionParams != null && connectionParams.size() > 0) { |
| LOG.debug("User specified connection params. " |
| + "Using properties specific API for making connection."); |
| |
| Properties props = new Properties(); |
| if (username != null) { |
| props.put("user", username); |
| } |
| |
| if (password != null) { |
| props.put("password", password); |
| } |
| |
| props.putAll(connectionParams); |
| connection = DriverManager.getConnection(connectString, props); |
| } else { |
| LOG.debug("No connection paramenters specified. " |
| + "Using regular API for making connection."); |
| if (username == null) { |
| connection = DriverManager.getConnection(connectString); |
| } else { |
| connection = DriverManager.getConnection( |
| connectString, username, password); |
| } |
| } |
| |
| // We only use this for metadata queries. Loosest semantics are okay. |
| connection.setTransactionIsolation(getMetadataIsolationLevel()); |
| connection.setAutoCommit(false); |
| |
| return connection; |
| } |
| |
| /** |
| * @return the transaction isolation level to use for metadata queries |
| * (queries executed by the ConnManager itself). |
| */ |
| protected int getMetadataIsolationLevel() { |
| return options.getMetadataTransactionIsolationLevel(); |
| } |
| |
| /** |
| * Export data stored in HDFS into a table in a database. |
| */ |
| public void exportTable(org.apache.sqoop.manager.ExportJobContext context) |
| throws IOException, ExportException { |
| context.setConnManager(this); |
| JdbcExportJob exportJob = new JdbcExportJob(context); |
| exportJob.runExport(); |
| } |
| |
| @Override |
| public void callTable(org.apache.sqoop.manager.ExportJobContext context) |
| throws IOException, |
| ExportException { |
| context.setConnManager(this); |
| JdbcCallExportJob exportJob = new JdbcCallExportJob(context); |
| exportJob.runExport(); |
| } |
| |
| public void release() { |
| if (null != this.lastStatement) { |
| try { |
| this.lastStatement.close(); |
| } catch (SQLException e) { |
| LoggingUtils.logAll(LOG, "Exception closing executed Statement: " |
| + e, e); |
| } |
| |
| this.lastStatement = null; |
| } |
| } |
| |
| @Override |
| /** |
| * {@inheritDoc} |
| */ |
| public void updateTable( |
| org.apache.sqoop.manager.ExportJobContext context) |
| throws IOException, ExportException { |
| context.setConnManager(this); |
| JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); |
| exportJob.runExport(); |
| } |
| |
| /** |
| * @return a SQL query to retrieve the current timestamp from the db. |
| */ |
| protected String getCurTimestampQuery() { |
| return "SELECT CURRENT_TIMESTAMP()"; |
| } |
| |
| @Override |
| /** |
| * {@inheritDoc} |
| */ |
| public Timestamp getCurrentDbTimestamp() { |
| release(); // Release any previous ResultSet. |
| |
| Statement s = null; |
| ResultSet rs = null; |
| try { |
| Connection c = getConnection(); |
| s = c.createStatement(); |
| rs = s.executeQuery(getCurTimestampQuery()); |
| if (rs == null || !rs.next()) { |
| return null; // empty ResultSet. |
| } |
| |
| return rs.getTimestamp(1); |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQL exception accessing current timestamp: " |
| + sqlE, sqlE); |
| return null; |
| } finally { |
| try { |
| if (null != rs) { |
| rs.close(); |
| } |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQL Exception closing resultset: " |
| + sqlE, sqlE); |
| } |
| |
| try { |
| if (null != s) { |
| s.close(); |
| } |
| } catch (SQLException sqlE) { |
| LoggingUtils.logAll(LOG, "SQL Exception closing statement: " |
| + sqlE, sqlE); |
| } |
| } |
| } |
| |
| @Override |
| public long getTableRowCount(String tableName) throws SQLException { |
| release(); // Release any previous ResultSet |
| |
| // Escape used table name |
| tableName = escapeTableName(tableName); |
| |
| long result = -1; |
| String countQuery = "SELECT COUNT(*) FROM " + tableName; |
| Statement stmt = null; |
| ResultSet rset = null; |
| try { |
| Connection conn = getConnection(); |
| stmt = conn.createStatement(); |
| rset = stmt.executeQuery(countQuery); |
| rset.next(); |
| result = rset.getLong(1); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to query count * for table " |
| + tableName, ex); |
| throw ex; |
| } finally { |
| if (rset != null) { |
| try { |
| rset.close(); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to close result set", ex); |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to close statement", ex); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public void deleteAllRecords(String tableName) throws SQLException { |
| release(); // Release any previous ResultSet |
| |
| // Escape table name |
| tableName = escapeTableName(tableName); |
| |
| String deleteQuery = "DELETE FROM " + tableName; |
| Statement stmt = null; |
| try { |
| Connection conn = getConnection(); |
| stmt = conn.createStatement(); |
| int updateCount = stmt.executeUpdate(deleteQuery); |
| conn.commit(); |
| LOG.info("Deleted " + updateCount + " records from " + tableName); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to execute delete query: " |
| + deleteQuery, ex); |
| throw ex; |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to close statement", ex); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void migrateData(String fromTable, String toTable) |
| throws SQLException { |
| release(); // Release any previous ResultSet |
| |
| // Escape all table names |
| fromTable = escapeTableName(fromTable); |
| toTable = escapeTableName(toTable); |
| |
| String updateQuery = "INSERT INTO " + toTable |
| + " ( SELECT * FROM " + fromTable + " )"; |
| |
| String deleteQuery = "DELETE FROM " + fromTable; |
| Statement stmt = null; |
| try { |
| Connection conn = getConnection(); |
| stmt = conn.createStatement(); |
| |
| // Insert data from the fromTable to the toTable |
| int updateCount = stmt.executeUpdate(updateQuery); |
| LOG.info("Migrated " + updateCount + " records from " + fromTable |
| + " to " + toTable); |
| |
| // Delete the records from the fromTable |
| int deleteCount = stmt.executeUpdate(deleteQuery); |
| |
| // If the counts do not match, fail the transaction |
| if (updateCount != deleteCount) { |
| conn.rollback(); |
| throw new RuntimeException("Inconsistent record counts"); |
| } |
| conn.commit(); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to migrate data from " |
| + fromTable + " to " + toTable, ex); |
| throw ex; |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException ex) { |
| LoggingUtils.logAll(LOG, "Unable to close statement", ex); |
| } |
| } |
| } |
| } |
| |
| public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) { |
| return options.getBoundaryQuery(); |
| } |
| } |