| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.sqoop.repository.derby; |
| |
| import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*; |
| |
| import java.net.URL; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Timestamp; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.log4j.Logger; |
| import org.apache.sqoop.common.Direction; |
| import org.apache.sqoop.common.DirectionError; |
| import org.apache.sqoop.common.SqoopException; |
| import org.apache.sqoop.common.SupportedDirections; |
| import org.apache.sqoop.connector.ConnectorHandler; |
| import org.apache.sqoop.connector.ConnectorManagerUtils; |
| import org.apache.sqoop.model.MBooleanInput; |
| import org.apache.sqoop.model.MConfig; |
| import org.apache.sqoop.model.MConfigType; |
| import org.apache.sqoop.model.MConnector; |
| import org.apache.sqoop.model.MDriver; |
| import org.apache.sqoop.model.MDriverConfig; |
| import org.apache.sqoop.model.MEnumInput; |
| import org.apache.sqoop.model.MFromConfig; |
| import org.apache.sqoop.model.MInput; |
| import org.apache.sqoop.model.MInputType; |
| import org.apache.sqoop.model.MIntegerInput; |
| import org.apache.sqoop.model.MJob; |
| import org.apache.sqoop.model.MLink; |
| import org.apache.sqoop.model.MLinkConfig; |
| import org.apache.sqoop.model.MMapInput; |
| import org.apache.sqoop.model.MStringInput; |
| import org.apache.sqoop.model.MSubmission; |
| import org.apache.sqoop.model.MToConfig; |
| import org.apache.sqoop.repository.JdbcRepositoryContext; |
| import org.apache.sqoop.repository.JdbcRepositoryHandler; |
| import org.apache.sqoop.submission.SubmissionStatus; |
| import org.apache.sqoop.submission.counter.Counter; |
| import org.apache.sqoop.submission.counter.CounterGroup; |
| import org.apache.sqoop.submission.counter.Counters; |
| |
| /** |
| * JDBC based repository handler for Derby database. |
| * |
| * Repository implementation for Derby database. |
| */ |
| public class DerbyRepositoryHandler extends JdbcRepositoryHandler { |
| |
| private static final Logger LOG = |
| Logger.getLogger(DerbyRepositoryHandler.class); |
| |
| private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME = |
| "org.apache.derby.jdbc.EmbeddedDriver"; |
| |
| /** |
| * Unique name of HDFS Connector. |
| * HDFS Connector was originally part of the Sqoop driver, but now is its |
| * own connector. This constant is used to pre-register the HDFS Connector |
| * so that jobs that are being upgraded can reference the HDFS Connector. |
| */ |
| private static final String CONNECTOR_HDFS = "hdfs-connector"; |
| |
| private static final String LINK_HDFS = "hdfs-link"; |
| |
| private JdbcRepositoryContext repoContext; |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void registerConnector(MConnector mc, Connection conn) { |
| if (mc.hasPersistenceId()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0011, |
| mc.getUniqueName()); |
| } |
| mc.setPersistenceId(getConnectorId(mc, conn)); |
| insertConfigsForConnector(mc, conn); |
| } |
| |
| /** |
| * Helper method to insert the configs from the into the |
| * repository. |
| * @param mDriver The driver instance to use to upgrade. |
| * @param conn JDBC link to use for updating the configs |
| */ |
| private void insertConfigsForDriver(MDriver mDriver, Connection conn) { |
| PreparedStatement baseConfigStmt = null; |
| PreparedStatement baseInputStmt = null; |
| try{ |
| baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| |
| baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| |
| // Register the job config type, since driver config is per job |
| registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(), |
| MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); |
| |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mDriver.toString(), ex); |
| } finally { |
| closeStatements(baseConfigStmt, baseInputStmt); |
| } |
| } |
| |
| /** |
| * Helper method to insert the configs from the MConnector into the |
| * repository. The job and connector configs within <code>mc</code> will get |
| * updated with the id of the configs when this function returns. |
| * @param mc The connector to use for updating configs |
| * @param conn JDBC link to use for updating the configs |
| */ |
| private void insertConfigsForConnector (MConnector mc, Connection conn) { |
| long connectorId = mc.getPersistenceId(); |
| PreparedStatement baseConfigStmt = null; |
| PreparedStatement baseInputStmt = null; |
| try{ |
| baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| |
| baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| |
| // Register link type config |
| registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(), |
| MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); |
| |
| // Register both from/to job type config |
| if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) { |
| registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(), |
| MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); |
| } |
| if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) { |
| registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(), |
| MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); |
| } |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0014, |
| mc.toString(), ex); |
| } finally { |
| closeStatements(baseConfigStmt, baseInputStmt); |
| } |
| |
| } |
| |
| private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn) |
| throws SQLException { |
| PreparedStatement stmt = null; |
| |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS); |
| stmt.setLong(1, connectorId); |
| stmt.setLong(2, getDirection(direction, conn)); |
| |
| if (stmt.executeUpdate() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0049); |
| } |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn) |
| throws SQLException { |
| if (directions.isDirectionSupported(Direction.FROM)) { |
| insertConnectorDirection(connectorId, Direction.FROM, conn); |
| } |
| |
| if (directions.isDirectionSupported(Direction.TO)) { |
| insertConnectorDirection(connectorId, Direction.TO, conn); |
| } |
| } |
| |
| private long getConnectorId(MConnector mc, Connection conn) { |
| PreparedStatement baseConnectorStmt = null; |
| try { |
| baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| baseConnectorStmt.setString(1, mc.getUniqueName()); |
| baseConnectorStmt.setString(2, mc.getClassName()); |
| baseConnectorStmt.setString(3, mc.getVersion()); |
| |
| int baseConnectorCount = baseConnectorStmt.executeUpdate(); |
| if (baseConnectorCount != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0012, |
| Integer.toString(baseConnectorCount)); |
| } |
| |
| ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); |
| |
| if (!rsetConnectorId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| insertConnectorDirections(rsetConnectorId.getLong(1), |
| mc.getSupportedDirections(), conn); |
| |
| return rsetConnectorId.getLong(1); |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0014, |
| mc.toString(), ex); |
| } finally { |
| closeStatements(baseConnectorStmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public synchronized void initialize(JdbcRepositoryContext ctx) { |
| repoContext = ctx; |
| repoContext.getDataSource(); |
| LOG.info("DerbyRepositoryHandler initialized."); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public synchronized void shutdown() { |
| String driver = repoContext.getDriverClass(); |
| if (driver != null && driver.equals(EMBEDDED_DERBY_DRIVER_CLASSNAME)) { |
| // Using embedded derby. Needs explicit shutdown |
| String connectUrl = repoContext.getConnectionUrl(); |
| if (connectUrl.startsWith("jdbc:derby:")) { |
| int index = connectUrl.indexOf(';'); |
| String baseUrl = null; |
| if (index != -1) { |
| baseUrl = connectUrl.substring(0, index+1); |
| } else { |
| baseUrl = connectUrl + ";"; |
| } |
| String shutDownUrl = baseUrl + "shutdown=true"; |
| |
| LOG.debug("Attempting to shutdown embedded Derby using URL: " |
| + shutDownUrl); |
| |
| try { |
| DriverManager.getConnection(shutDownUrl); |
| } catch (SQLException ex) { |
| // Shutdown for one db instance is expected to raise SQL STATE 45000 |
| if (ex.getErrorCode() != 45000) { |
| throw new SqoopException( |
| DerbyRepoError.DERBYREPO_0002, shutDownUrl, ex); |
| } |
| LOG.info("Embedded Derby shutdown raised SQL STATE " |
| + "45000 as expected."); |
| } |
| } else { |
| LOG.warn("Even though embedded Derby driver was loaded, the connect " |
| + "URL is of an unexpected config: " + connectUrl + ". Therefore no " |
| + "attempt will be made to shutdown embedded Derby instance."); |
| } |
| } |
| } |
| |
| /** |
| * Detect version of underlying database structures. |
| * |
| * @param conn JDBC Connection |
| * @return |
| */ |
| public int detectVersion(Connection conn) { |
| ResultSet rs = null; |
| PreparedStatement stmt = null; |
| |
| // First release went out without system table, so we have to detect |
| // this version differently. |
| try { |
| rs = conn.getMetaData().getTables(null, null, null, null); |
| |
| Set<String> tableNames = new HashSet<String>(); |
| while(rs.next()) { |
| tableNames.add(rs.getString("TABLE_NAME")); |
| } |
| closeResultSets(rs); |
| |
| LOG.debug("Detecting old version of repository"); |
| boolean foundAll = true; |
| for( String expectedTable : DerbySchemaConstants.tablesV1) { |
| if(!tableNames.contains(expectedTable)) { |
| foundAll = false; |
| LOG.debug("Missing table " + expectedTable); |
| } |
| } |
| |
| // If we find all expected tables, then we are on version 1 |
| if(foundAll && !tableNames.contains(DerbySchemaConstants.TABLE_SQ_SYSTEM_NAME)) { |
| return 1; |
| } |
| |
| } catch (SQLException e) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0041, e); |
| } finally { |
| closeResultSets(rs); |
| } |
| |
| // Normal version detection, select and return the version |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); |
| rs = stmt.executeQuery(); |
| |
| if(!rs.next()) { |
| return 0; |
| } |
| |
| return rs.getInt(1); |
| } catch (SQLException e) { |
| LOG.info("Can't fetch repository structure version.", e); |
| return 0; |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Detect version of the driver |
| * |
| * @param conn Connection to the repository |
| * @return Version of the Driver |
| */ |
| private String detectDriverVersion (Connection conn) { |
| ResultSet rs = null; |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION); |
| rs = stmt.executeQuery(); |
| if(!rs.next()) { |
| return null; |
| } |
| return rs.getString(1); |
| } catch (SQLException e) { |
| LOG.info("Can't fetch driver version.", e); |
| return null; |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Create or update driver version |
| * @param conn Connection to the the repository |
| * @param mDriver |
| */ |
| private void createOrUpdateDriverSystemVersion(Connection conn, String version) { |
| ResultSet rs = null; |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION); |
| stmt.executeUpdate(); |
| closeStatements(stmt); |
| |
| stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION); |
| stmt.setString(2, version); |
| stmt.executeUpdate(); |
| } catch (SQLException e) { |
| logException(e); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createOrUpdateInternals(Connection conn) { |
| int version = detectVersion(conn); |
| |
| if(version <= 0) { |
| runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_CONFIG, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_LINK, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn); |
| } |
| if(version <= 1) { |
| runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER, conn); |
| } |
| if(version <= 2) { |
| runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTOR_MODIFY_COLUMN_SQC_VERSION_VARCHAR_64, conn); |
| } |
| if(version <= 3) { |
| // Schema modifications |
| runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO, conn); |
| |
| // Data modifications only for non-fresh install. |
| if (version > 0) { |
| // Register HDFS connector |
| updteJobInternals(conn, registerHdfsConnector(conn)); |
| } |
| |
| // Change direction from VARCHAR to BIGINT + foreign key. |
| updateDirections(conn, insertDirections(conn)); |
| |
| // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data. |
| // Data updates depend on knowledge of the type of job. |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn); |
| |
| // Add unique constraints on job and links. |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn); |
| } |
| |
| ResultSet rs = null; |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); |
| stmt.executeUpdate(); |
| |
| closeStatements(stmt); |
| |
| stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); |
| stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); |
| stmt.setString(2, "" + DerbyRepoConstants.VERSION); |
| stmt.executeUpdate(); |
| } catch (SQLException e) { |
| LOG.error("Can't persist the repository version", e); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Insert directions: FROM and TO. |
| * @param conn |
| * @return Map<Direction, Long> direction ID => Direction |
| */ |
| protected Map<Direction, Long> insertDirections(Connection conn) { |
| // Add directions |
| Map<Direction, Long> directionMap = new TreeMap<Direction, Long>(); |
| PreparedStatement insertDirectionStmt = null; |
| try { |
| // Insert directions and get IDs. |
| for (Direction direction : Direction.values()) { |
| insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS); |
| insertDirectionStmt.setString(1, direction.toString()); |
| if (insertDirectionStmt.executeUpdate() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0046, "Could not add directions FROM and TO."); |
| } |
| |
| ResultSet directionId = insertDirectionStmt.getGeneratedKeys(); |
| if (directionId.next()) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Loaded direction: " + directionId.getLong(1)); |
| } |
| |
| directionMap.put(direction, directionId.getLong(1)); |
| } else { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0047, "Could not get ID of direction " + direction); |
| } |
| } |
| } catch (SQLException e) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e); |
| } finally { |
| closeStatements(insertDirectionStmt); |
| } |
| |
| return directionMap; |
| } |
| |
| /** |
| * Add normalized M2M for SQ_CONNECTOR and SQ_CONFIG for Direction. |
| * 1. Remember all ID => direction for configs. |
| * 2. Drop SQF_DIRECTION (varhchar). |
| * 3. Add new M2M tables for SQ_CONNECTOR and SQ_CONFIG. |
| * 4. Add directions via updating SQ_CONFIG with proper Direction IDs. |
| * 5. Make sure all connectors have all supported directions. |
| * @param conn |
| */ |
| protected void updateDirections(Connection conn, Map<Direction, Long> directionMap) { |
| // Remember directions |
| Statement fetchFormsStmt = null, |
| fetchConnectorsStmt = null; |
| List<Long> connectorIds = new LinkedList<Long>(); |
| List<Long> configIds = new LinkedList<Long>(); |
| List<String> directions = new LinkedList<String>(); |
| try { |
| fetchFormsStmt = conn.createStatement(); |
| ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS); |
| while (rs.next()) { |
| configIds.add(rs.getLong(1)); |
| directions.add(rs.getString(2)); |
| } |
| rs.close(); |
| } catch (SQLException e) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e); |
| } finally { |
| closeStatements(fetchFormsStmt); |
| } |
| |
| // Change Schema |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn); |
| runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn); |
| |
| // Add directions back |
| while (!configIds.isEmpty() && !directions.isEmpty()) { |
| Long configId = configIds.remove(0); |
| String directionString = directions.remove(0); |
| if (directionString != null && !directionString.isEmpty()) { |
| Direction direction = Direction.valueOf(directionString); |
| runQuery(STMT_INSERT_SQ_CONFIG_DIRECTIONS, conn, configId, directionMap.get(direction)); |
| } |
| } |
| |
| // Add connector directions |
| try { |
| fetchConnectorsStmt = conn.createStatement(); |
| ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL); |
| while (rs.next()) { |
| connectorIds.add(rs.getLong(1)); |
| } |
| rs.close(); |
| } catch (SQLException e) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e); |
| } finally { |
| closeStatements(fetchConnectorsStmt); |
| } |
| |
| for (Long connectorId : connectorIds) { |
| for (Long directionId : directionMap.values()) { |
| runQuery(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS, conn, connectorId, directionId); |
| } |
| } |
| } |
| |
| /** |
| * Upgrade job data from IMPORT/EXPORT to FROM/TO. |
| * Since the framework is no longer responsible for HDFS, |
| * the HDFS connector/link must be added. |
| * Also, the framework configs are moved around such that |
| * they belong to the added HDFS connector. Any extra configs |
| * are removed. |
| * NOTE: Connector configs should have a direction (FROM/TO), |
| * but framework configs should not. |
| * |
| * Here's a brief list describing the data migration process. |
| * 1. Change SQ_CONFIG.SQ_CFG_DIRECTION from IMPORT to FROM. |
| * 2. Change SQ_CONFIG.SQ_CFG_DIRECTION from EXPORT to TO. |
| * 3. Change EXPORT to TO in newly existing SQ_CFG_DIRECTION. |
| * This should affect connectors only since Connector configs |
| * should have had a value for SQ_CFG_OPERATION. |
| * 4. Change IMPORT to FROM in newly existing SQ_CFG_DIRECTION. |
| * This should affect connectors only since Connector configs |
| * should have had a value for SQ_CFG_OPERATION. |
| * 5. Add HDFS connector for jobs to reference. |
| * 6. Set 'input' and 'output' configs connector. |
| * to HDFS connector. |
| * 7. Throttling config was originally the second config in |
| * the framework. It should now be the first config. |
| * 8. Remove the EXPORT throttling config and ensure all of |
| * its dependencies point to the IMPORT throttling config. |
| * Then make sure the throttling config does not have a direction. |
| * Framework configs should not have a direction. |
| * 9. Create an HDFS link to reference and update |
| * jobs to reference that link. IMPORT jobs |
| * should have TO HDFS connector, EXPORT jobs should have |
| * FROM HDFS connector. |
| * 10. Update 'table' config names to 'fromJobConfig' and 'toTable'. |
| * Also update the relevant inputs as well. |
| * @param conn |
| */ |
| private void updteJobInternals(Connection conn, long connectorId) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Updating existing data for generic connectors."); |
| } |
| |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn, |
| Direction.FROM.toString(), "IMPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn, |
| Direction.TO.toString(), "EXPORT"); |
| |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION, conn, |
| Direction.FROM.toString(), |
| "input"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION, conn, |
| Direction.TO.toString(), |
| "output"); |
| |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR, conn, |
| new Long(connectorId), "input", "output"); |
| |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_CONFIG_INPUTS, conn, |
| "IMPORT", "EXPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_CONFIG_INPUTS, conn, |
| "throttling", "EXPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_DRIVER_CONFIG, conn, |
| "throttling", "EXPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DIRECTION_TO_NULL, conn, |
| "throttling"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX, conn, |
| new Long(0), "throttling"); |
| |
| Long linkId = createHdfsLink(conn, connectorId); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK, conn, |
| "EXPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK, conn, |
| new Long(linkId), "EXPORT"); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK, conn, |
| new Long(linkId), "IMPORT"); |
| |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn, |
| "fromJobConfig", "table", Direction.FROM.toString()); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES, conn, |
| Direction.FROM.toString().toLowerCase(), "fromJobConfig", Direction.FROM.toString()); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn, |
| "toJobConfig", "table", Direction.TO.toString()); |
| runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES, conn, |
| Direction.TO.toString().toLowerCase(), "toJobConfig", Direction.TO.toString()); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Updated existing data for generic connectors."); |
| } |
| } |
| |
| /** |
| * Pre-register HDFS Connector so that config upgrade will work. |
| */ |
| protected long registerHdfsConnector(Connection conn) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Begin HDFS Connector pre-loading."); |
| } |
| |
| List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs(); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Connector configs: " + connectorConfigs); |
| } |
| |
| ConnectorHandler handler = null; |
| for (URL url : connectorConfigs) { |
| handler = new ConnectorHandler(url); |
| |
| if (handler.getMetadata().getPersistenceId() != -1) { |
| return handler.getMetadata().getPersistenceId(); |
| } |
| |
| if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { |
| try { |
| PreparedStatement baseConnectorStmt = conn.prepareStatement( |
| STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, |
| Statement.RETURN_GENERATED_KEYS); |
| baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName()); |
| baseConnectorStmt.setString(2, handler.getMetadata().getClassName()); |
| baseConnectorStmt.setString(3, "0"); |
| if (baseConnectorStmt.executeUpdate() == 1) { |
| ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); |
| if (rsetConnectorId.next()) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("HDFS Connector pre-loaded: " + rsetConnectorId.getLong(1)); |
| } |
| return rsetConnectorId.getLong(1); |
| } |
| } |
| } catch (SQLException e) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| break; |
| } |
| } |
| |
| return -1L; |
| } |
| |
| /** |
| * Create an HDFS link. |
| * Intended to be used when moving HDFS connector out of driverConfig |
| * to its own connector. |
| * |
| * NOTE: Upgrade path only! |
| */ |
| private Long createHdfsLink(Connection conn, Long connectorId) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Creating HDFS link."); |
| } |
| |
| PreparedStatement stmt = null; |
| int result; |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_LINK, |
| Statement.RETURN_GENERATED_KEYS); |
| stmt.setString(1, LINK_HDFS); |
| stmt.setLong(2, connectorId); |
| stmt.setBoolean(3, true); |
| stmt.setNull(4, Types.VARCHAR); |
| stmt.setTimestamp(5, new Timestamp(System.currentTimeMillis())); |
| stmt.setNull(6, Types.VARCHAR); |
| stmt.setTimestamp(7, new Timestamp(System.currentTimeMillis())); |
| |
| result = stmt.executeUpdate(); |
| if (result != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0012, |
| Integer.toString(result)); |
| } |
| |
| ResultSet rsetConnectionId = stmt.getGeneratedKeys(); |
| |
| if (!rsetConnectionId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Created HDFS link."); |
| } |
| |
| return rsetConnectionId.getLong(1); |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean haveSuitableInternals(Connection conn) { |
| int version = detectVersion(conn); |
| |
| if(version != DerbyRepoConstants.VERSION) { |
| return false; |
| } |
| |
| // TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation) |
| return true; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public MConnector findConnector(String shortName, Connection conn) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Looking up connector: " + shortName); |
| } |
| MConnector mc = null; |
| PreparedStatement baseConnectorFetchStmt = null; |
| try { |
| baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR); |
| baseConnectorFetchStmt.setString(1, shortName); |
| |
| List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt,conn); |
| |
| if (connectors.size()==0) { |
| LOG.debug("No connector found by name: " + shortName); |
| return null; |
| } else if (connectors.size()==1) { |
| LOG.debug("Looking up connector: " + shortName + ", found: " + mc); |
| return connectors.get(0); |
| } |
| else { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName); |
| } |
| |
| } catch (SQLException ex) { |
| logException(ex, shortName); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex); |
| } finally { |
| closeStatements(baseConnectorFetchStmt); |
| } |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MConnector> findConnectors(Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL); |
| return loadConnectors(stmt,conn); |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0045, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void registerDriver(MDriver mDriver, Connection conn) { |
| if (mDriver.hasPersistenceId()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0011, |
| "Driver"); |
| } |
| |
| PreparedStatement baseConfigStmt = null; |
| PreparedStatement baseInputStmt = null; |
| try { |
| baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, |
| Statement.RETURN_GENERATED_KEYS); |
| |
| // Register a driver config as a job type with no owner/connector and direction |
| registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(), |
| MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); |
| |
| // We're using hardcoded value for driver config as they are |
| // represented as NULL in the database. |
| mDriver.setPersistenceId(1); |
| } catch (SQLException ex) { |
| logException(ex, mDriver); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex); |
| } finally { |
| closeStatements(baseConfigStmt, baseInputStmt); |
| } |
| createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public MDriver findDriver(Connection conn) { |
| LOG.debug("Looking up Driver config to create a driver "); |
| MDriver mDriver = null; |
| PreparedStatement driverConfigFetchStmt = null; |
| PreparedStatement driverConfigInputFetchStmt = null; |
| try { |
| driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); |
| driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); |
| List<MConfig> driverConfigs = new ArrayList<MConfig>(); |
| loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1); |
| |
| if(driverConfigs.isEmpty()) { |
| return null; |
| } |
| |
| mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn)); |
| mDriver.setPersistenceId(1); |
| |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0004, |
| "Driver config", ex); |
| } finally { |
| if (driverConfigFetchStmt != null) { |
| try { |
| driverConfigFetchStmt.close(); |
| } catch (SQLException ex) { |
| LOG.error("Unable to close config fetch statement", ex); |
| } |
| } |
| if (driverConfigInputFetchStmt != null) { |
| try { |
| driverConfigInputFetchStmt.close(); |
| } catch (SQLException ex) { |
| LOG.error("Unable to close input fetch statement", ex); |
| } |
| } |
| } |
| |
| LOG.debug("Looking up Driver config and created driver:" + mDriver); |
| return mDriver; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public String validationQuery() { |
| return "values(1)"; // Yes, this is valid derby SQL |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createLink(MLink link, Connection conn) { |
| PreparedStatement stmt = null; |
| int result; |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_LINK, |
| Statement.RETURN_GENERATED_KEYS); |
| stmt.setString(1, link.getName()); |
| stmt.setLong(2, link.getConnectorId()); |
| stmt.setBoolean(3, link.getEnabled()); |
| stmt.setString(4, link.getCreationUser()); |
| stmt.setTimestamp(5, new Timestamp(link.getCreationDate().getTime())); |
| stmt.setString(6, link.getLastUpdateUser()); |
| stmt.setTimestamp(7, new Timestamp(link.getLastUpdateDate().getTime())); |
| |
| result = stmt.executeUpdate(); |
| if (result != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0012, |
| Integer.toString(result)); |
| } |
| |
| ResultSet rsetConnectionId = stmt.getGeneratedKeys(); |
| |
| if (!rsetConnectionId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| long connectionId = rsetConnectionId.getLong(1); |
| |
| createInputValues(STMT_INSERT_LINK_INPUT, |
| connectionId, |
| link.getConnectorLinkConfig().getConfigs(), |
| conn); |
| link.setPersistenceId(connectionId); |
| |
| } catch (SQLException ex) { |
| logException(ex, link); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void updateLink(MLink link, Connection conn) { |
| PreparedStatement deleteStmt = null; |
| PreparedStatement updateStmt = null; |
| try { |
| // Firstly remove old values |
| deleteStmt = conn.prepareStatement(STMT_DELETE_LINK_INPUT); |
| deleteStmt.setLong(1, link.getPersistenceId()); |
| deleteStmt.executeUpdate(); |
| |
| // Update LINK_CONFIG table |
| updateStmt = conn.prepareStatement(STMT_UPDATE_LINK); |
| updateStmt.setString(1, link.getName()); |
| updateStmt.setString(2, link.getLastUpdateUser()); |
| updateStmt.setTimestamp(3, new Timestamp(new Date().getTime())); |
| |
| updateStmt.setLong(4, link.getPersistenceId()); |
| updateStmt.executeUpdate(); |
| |
| // And reinsert new values |
| createInputValues(STMT_INSERT_LINK_INPUT, |
| link.getPersistenceId(), |
| link.getConnectorLinkConfig().getConfigs(), |
| conn); |
| |
| } catch (SQLException ex) { |
| logException(ex, link); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0021, ex); |
| } finally { |
| closeStatements(deleteStmt, updateStmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean existsLink(long id, Connection conn) { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_LINK_CHECK_BY_ID); |
| stmt.setLong(1, id); |
| rs = stmt.executeQuery(); |
| |
| // Should be always valid in query with count |
| rs.next(); |
| |
| return rs.getLong(1) == 1; |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0025, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| @Override |
| public boolean inUseLink(long connectionId, Connection conn) { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_JOBS_FOR_LINK_CHECK); |
| stmt.setLong(1, connectionId); |
| rs = stmt.executeQuery(); |
| |
| // Should be always valid in case of count(*) query |
| rs.next(); |
| |
| return rs.getLong(1) != 0; |
| |
| } catch (SQLException e) { |
| logException(e, connectionId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0032, e); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| @Override |
| public void enableLink(long connectionId, boolean enabled, Connection conn) { |
| PreparedStatement enableConn = null; |
| |
| try { |
| enableConn = conn.prepareStatement(STMT_ENABLE_LINK); |
| enableConn.setBoolean(1, enabled); |
| enableConn.setLong(2, connectionId); |
| enableConn.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, connectionId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0042, ex); |
| } finally { |
| closeStatements(enableConn); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void deleteLink(long id, Connection conn) { |
| PreparedStatement dltConn = null; |
| |
| try { |
| deleteLinkInputs(id, conn); |
| dltConn = conn.prepareStatement(STMT_DELETE_LINK); |
| dltConn.setLong(1, id); |
| dltConn.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex); |
| } finally { |
| closeStatements(dltConn); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void deleteLinkInputs(long id, Connection conn) { |
| PreparedStatement dltConnInput = null; |
| try { |
| dltConnInput = conn.prepareStatement(STMT_DELETE_LINK_INPUT); |
| dltConnInput.setLong(1, id); |
| dltConnInput.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex); |
| } finally { |
| closeStatements(dltConnInput); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public MLink findLink(long id, Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_LINK_SINGLE); |
| stmt.setLong(1, id); |
| |
| List<MLink> connections = loadLinks(stmt, conn); |
| |
| if(connections.size() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0024, "Couldn't find" |
| + " link with id " + id); |
| } |
| |
| // Return the first and only one link object |
| return connections.get(0); |
| |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MLink> findLinks(Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_LINK_ALL); |
| |
| return loadLinks(stmt, conn); |
| |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| |
| /** |
| * |
| * {@inheritDoc} |
| * |
| */ |
| @Override |
| public List<MLink> findLinksForConnector(long connectorID, Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR); |
| stmt.setLong(1, connectorID); |
| |
| return loadLinks(stmt, conn); |
| |
| } catch (SQLException ex) { |
| logException(ex, connectorID); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void updateConnector(MConnector mConnector, Connection conn) { |
| PreparedStatement updateConnectorStatement = null; |
| PreparedStatement deleteConfig = null; |
| PreparedStatement deleteInput = null; |
| try { |
| updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR); |
| deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR); |
| deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR); |
| updateConnectorStatement.setString(1, mConnector.getUniqueName()); |
| updateConnectorStatement.setString(2, mConnector.getClassName()); |
| updateConnectorStatement.setString(3, mConnector.getVersion()); |
| updateConnectorStatement.setLong(4, mConnector.getPersistenceId()); |
| |
| if (updateConnectorStatement.executeUpdate() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0038); |
| } |
| deleteInput.setLong(1, mConnector.getPersistenceId()); |
| deleteConfig.setLong(1, mConnector.getPersistenceId()); |
| deleteInput.executeUpdate(); |
| deleteConfig.executeUpdate(); |
| |
| } catch (SQLException e) { |
| logException(e, mConnector); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e); |
| } finally { |
| closeStatements(updateConnectorStatement, deleteConfig, deleteInput); |
| } |
| insertConfigsForConnector(mConnector, conn); |
| |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void updateDriver(MDriver mDriver, Connection conn) { |
| PreparedStatement deleteConfig = null; |
| PreparedStatement deleteInput = null; |
| try { |
| deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS); |
| deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS); |
| |
| deleteInput.executeUpdate(); |
| deleteConfig.executeUpdate(); |
| |
| } catch (SQLException e) { |
| logException(e, mDriver); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); |
| } finally { |
| closeStatements(deleteConfig, deleteInput); |
| } |
| createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); |
| insertConfigsForDriver(mDriver, conn); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJob(MJob job, Connection conn) { |
| PreparedStatement stmt = null; |
| int result; |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_JOB, Statement.RETURN_GENERATED_KEYS); |
| stmt.setString(1, job.getName()); |
| stmt.setLong(2, job.getLinkId(Direction.FROM)); |
| stmt.setLong(3, job.getLinkId(Direction.TO)); |
| stmt.setBoolean(4, job.getEnabled()); |
| stmt.setString(5, job.getCreationUser()); |
| stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime())); |
| stmt.setString(7, job.getLastUpdateUser()); |
| stmt.setTimestamp(8, new Timestamp(job.getLastUpdateDate().getTime())); |
| |
| result = stmt.executeUpdate(); |
| if (result != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0012, |
| Integer.toString(result)); |
| } |
| |
| ResultSet rsetJobId = stmt.getGeneratedKeys(); |
| |
| if (!rsetJobId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| long jobId = rsetJobId.getLong(1); |
| |
| // from config for the job |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| jobId, |
| job.getJobConfig(Direction.FROM).getConfigs(), |
| conn); |
| // to config for the job |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| jobId, |
| job.getJobConfig(Direction.TO).getConfigs(), |
| conn); |
| // driver config per job |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| jobId, |
| job.getDriverConfig().getConfigs(), |
| conn); |
| |
| job.setPersistenceId(jobId); |
| |
| } catch (SQLException ex) { |
| logException(ex, job); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0026, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void updateJob(MJob job, Connection conn) { |
| PreparedStatement deleteStmt = null; |
| PreparedStatement updateStmt = null; |
| try { |
| // Firstly remove old values |
| deleteStmt = conn.prepareStatement(STMT_DELETE_JOB_INPUT); |
| deleteStmt.setLong(1, job.getPersistenceId()); |
| deleteStmt.executeUpdate(); |
| |
| // Update job table |
| updateStmt = conn.prepareStatement(STMT_UPDATE_JOB); |
| updateStmt.setString(1, job.getName()); |
| updateStmt.setString(2, job.getLastUpdateUser()); |
| updateStmt.setTimestamp(3, new Timestamp(new Date().getTime())); |
| |
| updateStmt.setLong(4, job.getPersistenceId()); |
| updateStmt.executeUpdate(); |
| |
| // And reinsert new values |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| job.getPersistenceId(), |
| job.getJobConfig(Direction.FROM).getConfigs(), |
| conn); |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| job.getPersistenceId(), |
| job.getJobConfig(Direction.TO).getConfigs(), |
| conn); |
| createInputValues(STMT_INSERT_JOB_INPUT, |
| job.getPersistenceId(), |
| job.getDriverConfig().getConfigs(), |
| conn); |
| |
| } catch (SQLException ex) { |
| logException(ex, job); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0027, ex); |
| } finally { |
| closeStatements(deleteStmt, updateStmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean existsJob(long id, Connection conn) { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_JOB_CHECK_BY_ID); |
| stmt.setLong(1, id); |
| rs = stmt.executeQuery(); |
| |
| // Should be always valid in query with count |
| rs.next(); |
| |
| return rs.getLong(1) == 1; |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0029, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| @Override |
| public boolean inUseJob(long jobId, Connection conn) { |
| MSubmission submission = findSubmissionLastForJob(jobId, conn); |
| |
| // We have no submissions and thus job can't be in use |
| if(submission == null) { |
| return false; |
| } |
| |
| // We can't remove running job |
| if(submission.getStatus().isRunning()) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public void enableJob(long jobId, boolean enabled, Connection conn) { |
| PreparedStatement enableConn = null; |
| |
| try { |
| enableConn = conn.prepareStatement(STMT_ENABLE_JOB); |
| enableConn.setBoolean(1, enabled); |
| enableConn.setLong(2, jobId); |
| enableConn.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, jobId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0043, ex); |
| } finally { |
| closeStatements(enableConn); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void deleteJob(long id, Connection conn) { |
| PreparedStatement dlt = null; |
| try { |
| deleteJobInputs(id, conn); |
| dlt = conn.prepareStatement(STMT_DELETE_JOB); |
| dlt.setLong(1, id); |
| dlt.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex); |
| } finally { |
| closeStatements(dlt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void deleteJobInputs(long id, Connection conn) { |
| PreparedStatement dltInput = null; |
| try { |
| dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT); |
| dltInput.setLong(1, id); |
| dltInput.executeUpdate(); |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex); |
| } finally { |
| closeStatements(dltInput); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public MJob findJob(long id, Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_ID); |
| stmt.setLong(1, id); |
| |
| List<MJob> jobs = loadJobs(stmt, conn); |
| |
| if(jobs.size() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0030, "Couldn't find" |
| + " job with id " + id); |
| } |
| |
| // Return the first and only one link object |
| return jobs.get(0); |
| |
| } catch (SQLException ex) { |
| logException(ex, id); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MJob> findJobs(Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_JOB); |
| |
| return loadJobs(stmt, conn); |
| |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MJob> findJobsForConnector(long connectorId, Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR); |
| stmt.setLong(1, connectorId); |
| stmt.setLong(2, connectorId); |
| return loadJobs(stmt, conn); |
| |
| } catch (SQLException ex) { |
| logException(ex, connectorId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createSubmission(MSubmission submission, Connection conn) { |
| PreparedStatement stmt = null; |
| int result; |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_SUBMISSION, |
| Statement.RETURN_GENERATED_KEYS); |
| stmt.setLong(1, submission.getJobId()); |
| stmt.setString(2, submission.getStatus().name()); |
| stmt.setString(3, submission.getCreationUser()); |
| stmt.setTimestamp(4, new Timestamp(submission.getCreationDate().getTime())); |
| stmt.setString(5, submission.getLastUpdateUser()); |
| stmt.setTimestamp(6, new Timestamp(submission.getLastUpdateDate().getTime())); |
| stmt.setString(7, submission.getExternalId()); |
| stmt.setString(8, submission.getExternalLink()); |
| stmt.setString(9, submission.getExceptionInfo()); |
| stmt.setString(10, submission.getExceptionStackTrace()); |
| |
| result = stmt.executeUpdate(); |
| if (result != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0012, |
| Integer.toString(result)); |
| } |
| |
| ResultSet rsetSubmissionId = stmt.getGeneratedKeys(); |
| |
| if (!rsetSubmissionId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| long submissionId = rsetSubmissionId.getLong(1); |
| |
| if(submission.getCounters() != null) { |
| createSubmissionCounters(submissionId, submission.getCounters(), conn); |
| } |
| |
| // Save created persistence id |
| submission.setPersistenceId(submissionId); |
| |
| } catch (SQLException ex) { |
| logException(ex, submission); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0034, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean existsSubmission(long submissionId, Connection conn) { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SUBMISSION_CHECK); |
| stmt.setLong(1, submissionId); |
| rs = stmt.executeQuery(); |
| |
| // Should be always valid in query with count |
| rs.next(); |
| |
| return rs.getLong(1) == 1; |
| } catch (SQLException ex) { |
| logException(ex, submissionId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0033, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void updateSubmission(MSubmission submission, Connection conn) { |
| PreparedStatement stmt = null; |
| PreparedStatement deleteStmt = null; |
| try { |
| // Update properties in main table |
| stmt = conn.prepareStatement(STMT_UPDATE_SUBMISSION); |
| stmt.setString(1, submission.getStatus().name()); |
| stmt.setString(2, submission.getLastUpdateUser()); |
| stmt.setTimestamp(3, new Timestamp(submission.getLastUpdateDate().getTime())); |
| stmt.setString(4, submission.getExceptionInfo()); |
| stmt.setString(5, submission.getExceptionStackTrace()); |
| |
| stmt.setLong(6, submission.getPersistenceId()); |
| stmt.executeUpdate(); |
| |
| // Delete previous counters |
| deleteStmt = conn.prepareStatement(STMT_DELETE_COUNTER_SUBMISSION); |
| deleteStmt.setLong(1, submission.getPersistenceId()); |
| deleteStmt.executeUpdate(); |
| |
| // Reinsert new counters if needed |
| if(submission.getCounters() != null) { |
| createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn); |
| } |
| |
| } catch (SQLException ex) { |
| logException(ex, submission); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0035, ex); |
| } finally { |
| closeStatements(stmt, deleteStmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void purgeSubmissions(Date threshold, Connection conn) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_PURGE_SUBMISSIONS); |
| stmt.setTimestamp(1, new Timestamp(threshold.getTime())); |
| stmt.executeUpdate(); |
| |
| } catch (SQLException ex) { |
| logException(ex, threshold); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0036, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MSubmission> findSubmissionsUnfinished(Connection conn) { |
| List<MSubmission> submissions = new LinkedList<MSubmission>(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SUBMISSION_UNFINISHED); |
| |
| for(SubmissionStatus status : SubmissionStatus.unfinished()) { |
| stmt.setString(1, status.name()); |
| rs = stmt.executeQuery(); |
| |
| while(rs.next()) { |
| submissions.add(loadSubmission(rs, conn)); |
| } |
| |
| rs.close(); |
| rs = null; |
| } |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0037, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| |
| return submissions; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public List<MSubmission> findSubmissions(Connection conn) { |
| List<MSubmission> submissions = new LinkedList<MSubmission>(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS); |
| rs = stmt.executeQuery(); |
| |
| while(rs.next()) { |
| submissions.add(loadSubmission(rs, conn)); |
| } |
| |
| rs.close(); |
| rs = null; |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0039, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| |
| return submissions; |
| } |
| |
| @Override |
| public List<MSubmission> findSubmissionsForJob(long jobId, Connection conn) { |
| List<MSubmission> submissions = new LinkedList<MSubmission>(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS_FOR_JOB); |
| stmt.setLong(1, jobId); |
| rs = stmt.executeQuery(); |
| |
| while(rs.next()) { |
| submissions.add(loadSubmission(rs, conn)); |
| } |
| |
| rs.close(); |
| rs = null; |
| } catch (SQLException ex) { |
| logException(ex); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0040, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| |
| return submissions; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public MSubmission findSubmissionLastForJob(long jobId, Connection conn) { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS_FOR_JOB); |
| stmt.setLong(1, jobId); |
| stmt.setMaxRows(1); |
| rs = stmt.executeQuery(); |
| |
| if(!rs.next()) { |
| return null; |
| } |
| |
| return loadSubmission(rs, conn); |
| } catch (SQLException ex) { |
| logException(ex, jobId); |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0040, ex); |
| } finally { |
| closeResultSets(rs); |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Stores counters for given submission in repository. |
| * |
| * @param submissionId Submission id |
| * @param counters Counters that should be stored |
| * @param conn Connection to derby repository |
| * @throws SQLException |
| */ |
| private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException { |
| PreparedStatement stmt = null; |
| |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_COUNTER_SUBMISSION); |
| |
| for(CounterGroup group : counters) { |
| long groupId = getCounterGroupId(group, conn); |
| |
| for(Counter counter: group) { |
| long counterId = getCounterId(counter, conn); |
| |
| stmt.setLong(1, groupId); |
| stmt.setLong(2, counterId); |
| stmt.setLong(3, submissionId); |
| stmt.setLong(4, counter.getValue()); |
| |
| stmt.executeUpdate(); |
| } |
| } |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Resolves counter group database id. |
| * |
| * @param group Given group |
| * @param conn Connection to database |
| * @return Id |
| * @throws SQLException |
| */ |
| private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException { |
| PreparedStatement select = null; |
| PreparedStatement insert = null; |
| ResultSet rsSelect = null; |
| ResultSet rsInsert = null; |
| |
| try { |
| select = conn.prepareStatement(STMT_SELECT_COUNTER_GROUP); |
| select.setString(1, group.getName()); |
| |
| rsSelect = select.executeQuery(); |
| |
| if(rsSelect.next()) { |
| return rsSelect.getLong(1); |
| } |
| |
| insert = conn.prepareStatement(STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS); |
| insert.setString(1, group.getName()); |
| insert.executeUpdate(); |
| |
| rsInsert = insert.getGeneratedKeys(); |
| |
| if (!rsInsert.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| return rsInsert.getLong(1); |
| } finally { |
| closeResultSets(rsSelect, rsInsert); |
| closeStatements(select, insert); |
| } |
| } |
| |
| /** |
| * Resolves counter id. |
| * |
| * @param counter Given counter |
| * @param conn Connection to database |
| * @return Id |
| * @throws SQLException |
| */ |
| private long getCounterId(Counter counter, Connection conn) throws SQLException { |
| PreparedStatement select = null; |
| PreparedStatement insert = null; |
| ResultSet rsSelect = null; |
| ResultSet rsInsert = null; |
| |
| try { |
| select = conn.prepareStatement(STMT_SELECT_COUNTER); |
| select.setString(1, counter.getName()); |
| |
| rsSelect = select.executeQuery(); |
| |
| if(rsSelect.next()) { |
| return rsSelect.getLong(1); |
| } |
| |
| insert = conn.prepareStatement(STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS); |
| insert.setString(1, counter.getName()); |
| insert.executeUpdate(); |
| |
| rsInsert = insert.getGeneratedKeys(); |
| |
| if (!rsInsert.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0013); |
| } |
| |
| return rsInsert.getLong(1); |
| } finally { |
| closeResultSets(rsSelect, rsInsert); |
| closeStatements(select, insert); |
| } |
| } |
| |
| /** |
| * Create MSubmission structure from result set. |
| * |
| * @param rs Result set, only active row will be fetched |
| * @param conn Connection to database |
| * @return Created MSubmission structure |
| * @throws SQLException |
| */ |
| private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException { |
| MSubmission submission = new MSubmission(); |
| |
| submission.setPersistenceId(rs.getLong(1)); |
| submission.setJobId(rs.getLong(2)); |
| submission.setStatus(SubmissionStatus.valueOf(rs.getString(3))); |
| submission.setCreationUser(rs.getString(4)); |
| submission.setCreationDate(rs.getTimestamp(5)); |
| submission.setLastUpdateUser(rs.getString(6)); |
| submission.setLastUpdateDate(rs.getTimestamp(7)); |
| submission.setExternalId(rs.getString(8)); |
| submission.setExternalLink(rs.getString(9)); |
| submission.setExceptionInfo(rs.getString(10)); |
| submission.setExceptionStackTrace(rs.getString(11)); |
| |
| Counters counters = loadCountersSubmission(rs.getLong(1), conn); |
| submission.setCounters(counters); |
| |
| return submission; |
| } |
| |
| private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_COUNTER_SUBMISSION); |
| stmt.setLong(1, submissionId); |
| rs = stmt.executeQuery(); |
| |
| Counters counters = new Counters(); |
| |
| while (rs.next()) { |
| String groupName = rs.getString(1); |
| String counterName = rs.getString(2); |
| long value = rs.getLong(3); |
| |
| CounterGroup group = counters.getCounterGroup(groupName); |
| if (group == null) { |
| group = new CounterGroup(groupName); |
| counters.addCounterGroup(group); |
| } |
| |
| group.addCounter(new Counter(counterName, value)); |
| } |
| |
| if (counters.isEmpty()) { |
| return null; |
| } else { |
| return counters; |
| } |
| } finally { |
| closeStatements(stmt); |
| closeResultSets(rs); |
| } |
| } |
| |
| private Long getDirection(Direction direction, Connection conn) throws SQLException { |
| PreparedStatement directionStmt = null; |
| ResultSet rs = null; |
| |
| try { |
| directionStmt = conn.prepareStatement(STMT_SELECT_SQD_ID_BY_SQD_NAME); |
| directionStmt.setString(1, direction.toString()); |
| rs = directionStmt.executeQuery(); |
| |
| rs.next(); |
| return rs.getLong(1); |
| } finally { |
| if (rs != null) { |
| closeResultSets(rs); |
| } |
| if (directionStmt != null) { |
| closeStatements(directionStmt); |
| } |
| } |
| } |
| |
| private Direction getDirection(long directionId, Connection conn) throws SQLException { |
| PreparedStatement directionStmt = null; |
| ResultSet rs = null; |
| |
| try { |
| directionStmt = conn.prepareStatement(STMT_SELECT_SQD_NAME_BY_SQD_ID); |
| directionStmt.setLong(1, directionId); |
| rs = directionStmt.executeQuery(); |
| |
| rs.next(); |
| return Direction.valueOf(rs.getString(1)); |
| } finally { |
| if (rs != null) { |
| closeResultSets(rs); |
| } |
| if (directionStmt != null) { |
| closeStatements(directionStmt); |
| } |
| } |
| } |
| |
| private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException { |
| PreparedStatement connectorDirectionsStmt = null; |
| ResultSet rs = null; |
| |
| boolean from = false, to = false; |
| |
| try { |
| connectorDirectionsStmt = conn.prepareStatement(STMT_SELECT_SQ_CONNECTOR_DIRECTIONS); |
| connectorDirectionsStmt.setLong(1, connectorId); |
| rs = connectorDirectionsStmt.executeQuery(); |
| |
| while(rs.next()) { |
| switch(getDirection(rs.getLong(2), conn)) { |
| case FROM: |
| from = true; |
| break; |
| |
| case TO: |
| to = true; |
| break; |
| } |
| } |
| } finally { |
| if (rs != null) { |
| closeResultSets(rs); |
| } |
| if (connectorDirectionsStmt != null) { |
| closeStatements(connectorDirectionsStmt); |
| } |
| } |
| |
| return new SupportedDirections(from, to); |
| } |
| |
| private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException { |
| List<MConnector> connectors = new ArrayList<MConnector>(); |
| ResultSet rsConnectors = null; |
| PreparedStatement connectorConfigFetchStmt = null; |
| PreparedStatement connectorConfigInputFetchStmt = null; |
| |
| try { |
| rsConnectors = stmt.executeQuery(); |
| connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); |
| connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); |
| |
| while(rsConnectors.next()) { |
| long connectorId = rsConnectors.getLong(1); |
| String connectorName = rsConnectors.getString(2); |
| String connectorClassName = rsConnectors.getString(3); |
| String connectorVersion = rsConnectors.getString(4); |
| |
| connectorConfigFetchStmt.setLong(1, connectorId); |
| |
| List<MConfig> linkConfig = new ArrayList<MConfig>(); |
| List<MConfig> fromConfig = new ArrayList<MConfig>(); |
| List<MConfig> toConfig = new ArrayList<MConfig>(); |
| |
| loadConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt, |
| connectorConfigInputFetchStmt, 1, conn); |
| |
| SupportedDirections supportedDirections |
| = findConnectorSupportedDirections(connectorId, conn); |
| MFromConfig fromJobConfig = null; |
| MToConfig toJobConfig = null; |
| if (supportedDirections.isDirectionSupported(Direction.FROM)) { |
| fromJobConfig = new MFromConfig(fromConfig); |
| } |
| if (supportedDirections.isDirectionSupported(Direction.TO)) { |
| toJobConfig = new MToConfig(toConfig); |
| } |
| MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion, |
| new MLinkConfig(linkConfig), fromJobConfig, toJobConfig); |
| mc.setPersistenceId(connectorId); |
| |
| connectors.add(mc); |
| } |
| } finally { |
| closeResultSets(rsConnectors); |
| closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt); |
| } |
| |
| return connectors; |
| } |
| |
| private List<MLink> loadLinks(PreparedStatement stmt, |
| Connection conn) |
| throws SQLException { |
| List<MLink> links = new ArrayList<MLink>(); |
| ResultSet rsConnection = null; |
| PreparedStatement connectorConfigFetchStatement = null; |
| PreparedStatement connectorConfigInputStatement = null; |
| |
| try { |
| rsConnection = stmt.executeQuery(); |
| |
| // |
| connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); |
| connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT); |
| |
| while(rsConnection.next()) { |
| long id = rsConnection.getLong(1); |
| String name = rsConnection.getString(2); |
| long connectorId = rsConnection.getLong(3); |
| boolean enabled = rsConnection.getBoolean(4); |
| String creationUser = rsConnection.getString(5); |
| Date creationDate = rsConnection.getTimestamp(6); |
| String updateUser = rsConnection.getString(7); |
| Date lastUpdateDate = rsConnection.getTimestamp(8); |
| |
| connectorConfigFetchStatement.setLong(1, connectorId); |
| connectorConfigInputStatement.setLong(1, id); |
| connectorConfigInputStatement.setLong(3, id); |
| |
| List<MConfig> connectorLinkConfig = new ArrayList<MConfig>(); |
| List<MConfig> fromConfig = new ArrayList<MConfig>(); |
| List<MConfig> toConfig = new ArrayList<MConfig>(); |
| |
| loadConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorConfigFetchStatement, |
| connectorConfigInputStatement, 2, conn); |
| MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig)); |
| |
| link.setPersistenceId(id); |
| link.setName(name); |
| link.setCreationUser(creationUser); |
| link.setCreationDate(creationDate); |
| link.setLastUpdateUser(updateUser); |
| link.setLastUpdateDate(lastUpdateDate); |
| link.setEnabled(enabled); |
| |
| links.add(link); |
| } |
| } finally { |
| closeResultSets(rsConnection); |
| closeStatements(connectorConfigFetchStatement, connectorConfigInputStatement); |
| } |
| |
| return links; |
| } |
| |
| private List<MJob> loadJobs(PreparedStatement stmt, |
| Connection conn) |
| throws SQLException { |
| List<MJob> jobs = new ArrayList<MJob>(); |
| ResultSet rsJob = null; |
| PreparedStatement fromConfigFetchStmt = null; |
| PreparedStatement toConfigFetchStmt = null; |
| PreparedStatement driverConfigfetchStmt = null; |
| PreparedStatement jobInputFetchStmt = null; |
| |
| try { |
| rsJob = stmt.executeQuery(); |
| |
| fromConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); |
| toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); |
| driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); |
| jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); |
| |
| while(rsJob.next()) { |
| // why use connector? why cant it be link id? |
| long fromConnectorId = rsJob.getLong(1); |
| long toConnectorId = rsJob.getLong(2); |
| long id = rsJob.getLong(3); |
| String name = rsJob.getString(4); |
| long fromLinkId = rsJob.getLong(5); |
| long toLinkId = rsJob.getLong(6); |
| boolean enabled = rsJob.getBoolean(7); |
| String createBy = rsJob.getString(8); |
| Date creationDate = rsJob.getTimestamp(9); |
| String updateBy = rsJob.getString(10); |
| Date lastUpdateDate = rsJob.getTimestamp(11); |
| |
| fromConfigFetchStmt.setLong(1, fromConnectorId); |
| toConfigFetchStmt.setLong(1,toConnectorId); |
| |
| jobInputFetchStmt.setLong(1, id); |
| //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs |
| jobInputFetchStmt.setLong(3, id); |
| |
| // FROM entity configs |
| List<MConfig> fromConnectorLinkConfig = new ArrayList<MConfig>(); |
| List<MConfig> fromConnectorFromJobConfig = new ArrayList<MConfig>(); |
| List<MConfig> fromConnectorToJobConfig = new ArrayList<MConfig>(); |
| |
| loadConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig, |
| fromConfigFetchStmt, jobInputFetchStmt, 2, conn); |
| |
| // TO entity configs |
| List<MConfig> toConnectorLinkConfig = new ArrayList<MConfig>(); |
| List<MConfig> toConnectorFromJobConfig = new ArrayList<MConfig>(); |
| List<MConfig> toConnectorToJobConfig = new ArrayList<MConfig>(); |
| |
| // ?? dont we need 2 different driver configs for the from/to? |
| List<MConfig> driverConfig = new ArrayList<MConfig>(); |
| |
| loadConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig, |
| toConfigFetchStmt, jobInputFetchStmt, 2, conn); |
| |
| loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2); |
| |
| MJob job = new MJob( |
| fromConnectorId, toConnectorId, |
| fromLinkId, toLinkId, |
| new MFromConfig(fromConnectorFromJobConfig), |
| new MToConfig(toConnectorToJobConfig), |
| new MDriverConfig(driverConfig)); |
| |
| job.setPersistenceId(id); |
| job.setName(name); |
| job.setCreationUser(createBy); |
| job.setCreationDate(creationDate); |
| job.setLastUpdateUser(updateBy); |
| job.setLastUpdateDate(lastUpdateDate); |
| job.setEnabled(enabled); |
| |
| jobs.add(job); |
| } |
| } finally { |
| closeResultSets(rsJob); |
| closeStatements(fromConfigFetchStmt, toConfigFetchStmt, driverConfigfetchStmt, jobInputFetchStmt); |
| } |
| |
| return jobs; |
| } |
| |
| private void registerConfigDirection(Long configId, Direction direction, Connection conn) |
| throws SQLException { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(STMT_INSERT_SQ_CONFIG_DIRECTIONS); |
| stmt.setLong(1, configId); |
| stmt.setLong(2, getDirection(direction, conn)); |
| if (stmt.executeUpdate() != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0048); |
| } |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Register configs in derby database. This method will insert the ids |
| * generated by the repository into the configs passed in itself. |
| * |
| * Use given prepared statements to create entire config structure in database. |
| * |
| * @param connectorId |
| * @param configs |
| * @param type |
| * @param baseConfigStmt |
| * @param baseInputStmt |
| * @param conn |
| * @return short number of configs registered. |
| * @throws SQLException |
| */ |
| private short registerConfigs(Long connectorId, Direction direction, |
| List<MConfig> configs, String type, PreparedStatement baseConfigStmt, |
| PreparedStatement baseInputStmt, Connection conn) |
| throws SQLException { |
| short configIndex = 0; |
| |
| for (MConfig config : configs) { |
| if(connectorId == null) { |
| baseConfigStmt.setNull(1, Types.BIGINT); |
| } else { |
| baseConfigStmt.setLong(1, connectorId); |
| } |
| |
| baseConfigStmt.setString(2, config.getName()); |
| baseConfigStmt.setString(3, type); |
| baseConfigStmt.setShort(4, configIndex++); |
| |
| int baseConfigCount = baseConfigStmt.executeUpdate(); |
| if (baseConfigCount != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0015, |
| Integer.toString(baseConfigCount)); |
| } |
| ResultSet rsetConfigId = baseConfigStmt.getGeneratedKeys(); |
| if (!rsetConfigId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0016); |
| } |
| |
| long configId = rsetConfigId.getLong(1); |
| config.setPersistenceId(configId); |
| |
| if (direction != null) { |
| registerConfigDirection(configId, direction, conn); |
| } |
| |
| // Insert all the inputs |
| List<MInput<?>> inputs = config.getInputs(); |
| registerConfigInputs(configId, inputs, baseInputStmt); |
| } |
| return configIndex; |
| } |
| |
| /** |
| * Save given inputs to the database. |
| * |
| * Use given prepare statement to save all inputs into repository. |
| * |
| * @param configId Identifier for corresponding config |
| * @param inputs List of inputs that needs to be saved |
| * @param baseInputStmt Statement that we can utilize |
| * @throws SQLException In case of any failure on Derby side |
| */ |
| private void registerConfigInputs(long configId, List<MInput<?>> inputs, |
| PreparedStatement baseInputStmt) throws SQLException { |
| short inputIndex = 0; |
| for (MInput<?> input : inputs) { |
| baseInputStmt.setString(1, input.getName()); |
| baseInputStmt.setLong(2, configId); |
| baseInputStmt.setShort(3, inputIndex++); |
| baseInputStmt.setString(4, input.getType().name()); |
| baseInputStmt.setBoolean(5, input.isSensitive()); |
| // String specific column(s) |
| if (input.getType().equals(MInputType.STRING)) { |
| MStringInput strInput = (MStringInput) input; |
| baseInputStmt.setShort(6, strInput.getMaxLength()); |
| } else { |
| baseInputStmt.setNull(6, Types.INTEGER); |
| } |
| // Enum specific column(s) |
| if(input.getType() == MInputType.ENUM) { |
| baseInputStmt.setString(7, StringUtils.join(((MEnumInput)input).getValues(), ",")); |
| } else { |
| baseInputStmt.setNull(7, Types.VARCHAR); |
| } |
| |
| int baseInputCount = baseInputStmt.executeUpdate(); |
| if (baseInputCount != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0017, |
| Integer.toString(baseInputCount)); |
| } |
| |
| ResultSet rsetInputId = baseInputStmt.getGeneratedKeys(); |
| if (!rsetInputId.next()) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0018); |
| } |
| |
| long inputId = rsetInputId.getLong(1); |
| input.setPersistenceId(inputId); |
| } |
| } |
| |
| /** |
| * Execute given query on database. |
| * |
| * @param query Query that should be executed |
| */ |
| private void runQuery(String query, Connection conn, Object... args) { |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(query); |
| |
| for (int i = 0; i < args.length; ++i) { |
| if (args[i] instanceof String) { |
| stmt.setString(i + 1, (String)args[i]); |
| } else if (args[i] instanceof Long) { |
| stmt.setLong(i + 1, (Long) args[i]); |
| } else { |
| stmt.setObject(i + 1, args[i]); |
| } |
| } |
| |
| if (stmt.execute()) { |
| ResultSet rset = stmt.getResultSet(); |
| int count = 0; |
| while (rset.next()) { |
| count++; |
| } |
| LOG.info("QUERY(" + query + ") produced unused resultset with "+ count + " rows"); |
| } else { |
| int updateCount = stmt.getUpdateCount(); |
| LOG.info("QUERY(" + query + ") Update count: " + updateCount); |
| } |
| } catch (SQLException ex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0003, query, ex); |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Load configs and corresponding inputs from Derby database. |
| * |
| * Use given prepared statements to load all configs and corresponding inputs |
| * from Derby. |
| * |
| * @param driverConfig List of driver configs that will be filled up |
| * @param configFetchStatement Prepared statement for fetching configs |
| * @param inputFetchStmt Prepare statement for fetching inputs |
| * @param configPosition position of the config |
| * @throws SQLException In case of any failure on Derby side |
| */ |
| public void loadDriverConfigs(List<MConfig> driverConfig, |
| PreparedStatement configFetchStatement, |
| PreparedStatement inputFetchStmt, |
| int configPosition) throws SQLException { |
| |
| // Get list of structures from database |
| ResultSet rsetConfig = configFetchStatement.executeQuery(); |
| while (rsetConfig.next()) { |
| long configId = rsetConfig.getLong(1); |
| Long fromConnectorId = rsetConfig.getLong(2); |
| String configName = rsetConfig.getString(3); |
| String configTYpe = rsetConfig.getString(4); |
| int configIndex = rsetConfig.getInt(5); |
| List<MInput<?>> configInputs = new ArrayList<MInput<?>>(); |
| |
| MConfig mDriverConfig = new MConfig(configName, configInputs); |
| mDriverConfig.setPersistenceId(configId); |
| |
| inputFetchStmt.setLong(configPosition, configId); |
| |
| ResultSet rsetInput = inputFetchStmt.executeQuery(); |
| while (rsetInput.next()) { |
| long inputId = rsetInput.getLong(1); |
| String inputName = rsetInput.getString(2); |
| long inputConfig = rsetInput.getLong(3); |
| short inputIndex = rsetInput.getShort(4); |
| String inputType = rsetInput.getString(5); |
| boolean inputSensitivity = rsetInput.getBoolean(6); |
| short inputStrLength = rsetInput.getShort(7); |
| String inputEnumValues = rsetInput.getString(8); |
| String value = rsetInput.getString(9); |
| |
| MInputType mit = MInputType.valueOf(inputType); |
| |
| MInput input = null; |
| switch (mit) { |
| case STRING: |
| input = new MStringInput(inputName, inputSensitivity, inputStrLength); |
| break; |
| case MAP: |
| input = new MMapInput(inputName, inputSensitivity); |
| break; |
| case BOOLEAN: |
| input = new MBooleanInput(inputName, inputSensitivity); |
| break; |
| case INTEGER: |
| input = new MIntegerInput(inputName, inputSensitivity); |
| break; |
| case ENUM: |
| input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(",")); |
| break; |
| default: |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0006, |
| "input-" + inputName + ":" + inputId + ":" |
| + "config-" + inputConfig + ":" + mit.name()); |
| } |
| |
| // Set persistent ID |
| input.setPersistenceId(inputId); |
| |
| // Set value |
| if(value == null) { |
| input.setEmpty(); |
| } else { |
| input.restoreFromUrlSafeValueString(value); |
| } |
| |
| if (mDriverConfig.getInputs().size() != inputIndex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0009, |
| "config: " + mDriverConfig |
| + "; input: " + input |
| + "; index: " + inputIndex |
| + "; expected: " + mDriverConfig.getInputs().size() |
| ); |
| } |
| |
| mDriverConfig.getInputs().add(input); |
| } |
| |
| if (mDriverConfig.getInputs().size() == 0) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0008, |
| "owner-" + fromConnectorId |
| + "; config: " + mDriverConfig |
| ); |
| } |
| |
| MConfigType configType = MConfigType.valueOf(configTYpe); |
| switch (configType) { |
| case JOB: |
| if (driverConfig.size() != configIndex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0010, |
| "owner-" + fromConnectorId |
| + "; config: " + configType |
| + "; index: " + configIndex |
| + "; expected: " + driverConfig.size() |
| ); |
| } |
| driverConfig.add(mDriverConfig); |
| break; |
| default: |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0007, |
| "connector-" + fromConnectorId + ":" + configType); |
| } |
| } |
| } |
| |
| private Direction findConfigDirection(long configId, Connection conn) throws SQLException { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| stmt = conn.prepareStatement(STMT_SELECT_SQ_CONFIG_DIRECTIONS); |
| stmt.setLong(1, configId); |
| rs = stmt.executeQuery(); |
| rs.next(); |
| return getDirection(rs.getLong(2), conn); |
| } finally { |
| if (rs != null) { |
| closeResultSets(rs); |
| } |
| if (stmt != null) { |
| closeStatements(stmt); |
| } |
| } |
| } |
| |
| /** |
| * Load configs and corresponding inputs from Derby database. |
| * |
| * Use given prepared statements to load all configs and corresponding inputs |
| * from Derby. |
| * |
| * @param linkConfig List of link configs that will be filled up |
| * @param fromConfig FROM job configs that will be filled up |
| * @param toConfig TO job configs that will be filled up |
| * @param configFetchStmt Prepared statement for fetching configs |
| * @param inputFetchStmt Prepare statement for fetching inputs |
| * @param conn Connection object that is used to find config direction. |
| * @throws SQLException In case of any failure on Derby side |
| */ |
| public void loadConfigTypes(List<MConfig> linkConfig, List<MConfig> fromConfig, |
| List<MConfig> toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt, |
| int configPosition, Connection conn) throws SQLException { |
| |
| // Get list of structures from database |
| ResultSet rsetConfig = configFetchStmt.executeQuery(); |
| while (rsetConfig.next()) { |
| long configId = rsetConfig.getLong(1); |
| Long configConnectorId = rsetConfig.getLong(2); |
| String configName = rsetConfig.getString(3); |
| String configType = rsetConfig.getString(4); |
| int configIndex = rsetConfig.getInt(5); |
| List<MInput<?>> configInputs = new ArrayList<MInput<?>>(); |
| |
| MConfig config = new MConfig(configName, configInputs); |
| config.setPersistenceId(configId); |
| |
| inputFetchStmt.setLong(configPosition, configId); |
| |
| ResultSet rsetInput = inputFetchStmt.executeQuery(); |
| while (rsetInput.next()) { |
| long inputId = rsetInput.getLong(1); |
| String inputName = rsetInput.getString(2); |
| long inputConfig = rsetInput.getLong(3); |
| short inputIndex = rsetInput.getShort(4); |
| String inputType = rsetInput.getString(5); |
| boolean inputSensitivity = rsetInput.getBoolean(6); |
| short inputStrLength = rsetInput.getShort(7); |
| String inputEnumValues = rsetInput.getString(8); |
| String value = rsetInput.getString(9); |
| |
| MInputType mit = MInputType.valueOf(inputType); |
| |
| MInput<?> input = null; |
| switch (mit) { |
| case STRING: |
| input = new MStringInput(inputName, inputSensitivity, inputStrLength); |
| break; |
| case MAP: |
| input = new MMapInput(inputName, inputSensitivity); |
| break; |
| case BOOLEAN: |
| input = new MBooleanInput(inputName, inputSensitivity); |
| break; |
| case INTEGER: |
| input = new MIntegerInput(inputName, inputSensitivity); |
| break; |
| case ENUM: |
| input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(",")); |
| break; |
| default: |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0006, |
| "input-" + inputName + ":" + inputId + ":" |
| + "config-" + inputConfig + ":" + mit.name()); |
| } |
| |
| // Set persistent ID |
| input.setPersistenceId(inputId); |
| |
| // Set value |
| if(value == null) { |
| input.setEmpty(); |
| } else { |
| input.restoreFromUrlSafeValueString(value); |
| } |
| |
| if (config.getInputs().size() != inputIndex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0009, |
| "config: " + config |
| + "; input: " + input |
| + "; index: " + inputIndex |
| + "; expected: " + config.getInputs().size() |
| ); |
| } |
| |
| config.getInputs().add(input); |
| } |
| |
| if (config.getInputs().size() == 0) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0008, |
| "connector-" + configConnectorId |
| + "; config: " + config |
| ); |
| } |
| |
| MConfigType mConfigType = MConfigType.valueOf(configType); |
| switch (mConfigType) { |
| case LINK: |
| if (linkConfig.size() != configIndex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0010, |
| "connector-" + configConnectorId |
| + "; config: " + config |
| + "; index: " + configIndex |
| + "; expected: " + linkConfig.size() |
| ); |
| } |
| linkConfig.add(config); |
| break; |
| case JOB: |
| Direction type = findConfigDirection(configId, conn); |
| List<MConfig> jobConfigs; |
| switch(type) { |
| case FROM: |
| jobConfigs = fromConfig; |
| break; |
| |
| case TO: |
| jobConfigs = toConfig; |
| break; |
| |
| default: |
| throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); |
| } |
| |
| if (jobConfigs.size() != configIndex) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0010, |
| "connector-" + configConnectorId |
| + "; config: " + config |
| + "; index: " + configIndex |
| + "; expected: " + jobConfigs.size() |
| ); |
| } |
| |
| jobConfigs.add(config); |
| break; |
| default: |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0007, |
| "connector-" + configConnectorId + ":" + config); |
| } |
| } |
| } |
| |
| private void createInputValues(String query, |
| long id, |
| List<MConfig> configs, |
| Connection conn) throws SQLException { |
| PreparedStatement stmt = null; |
| int result; |
| |
| try { |
| stmt = conn.prepareStatement(query); |
| |
| for (MConfig config : configs) { |
| for (MInput input : config.getInputs()) { |
| // Skip empty values as we're not interested in storing those in db |
| if (input.isEmpty()) { |
| continue; |
| } |
| stmt.setLong(1, id); |
| stmt.setLong(2, input.getPersistenceId()); |
| stmt.setString(3, input.getUrlSafeValueString()); |
| |
| result = stmt.executeUpdate(); |
| if (result != 1) { |
| throw new SqoopException(DerbyRepoError.DERBYREPO_0020, |
| Integer.toString(result)); |
| } |
| } |
| } |
| } finally { |
| closeStatements(stmt); |
| } |
| } |
| |
| /** |
| * Close all given Results set. |
| * |
| * Any occurring exception is silently ignored and logged. |
| * |
| * @param resultSets Result sets to close |
| */ |
| private void closeResultSets(ResultSet ... resultSets) { |
| if(resultSets == null) { |
| return; |
| } |
| for (ResultSet rs : resultSets) { |
| if(rs != null) { |
| try { |
| rs.close(); |
| } catch(SQLException ex) { |
| LOG.error("Exception during closing result set", ex); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Close all given statements. |
| * |
| * Any occurring exception is silently ignored and logged. |
| * |
| * @param stmts Statements to close |
| */ |
| private void closeStatements(Statement... stmts) { |
| if(stmts == null) { |
| return; |
| } |
| for (Statement stmt : stmts) { |
| if(stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException ex) { |
| LOG.error("Exception during closing statement", ex); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Log exception and all String variant of arbitrary number of objects. |
| * |
| * This method is useful to log SQLException with all objects that were |
| * used in the query generation to see where is the issue. |
| * |
| * @param throwable Arbitrary throwable object |
| * @param objects Arbitrary array of associated objects |
| */ |
| private void logException(Throwable throwable, Object ...objects) { |
| LOG.error("Exception in repository operation", throwable); |
| LOG.error("Associated objects: "+ objects.length); |
| for(Object object : objects) { |
| LOG.error("\t" + object.getClass().getSimpleName() + ": " + object.toString()); |
| } |
| } |
| } |