blob: 93166872fe289b476217d7b6a2e1b36774ffae87 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedList;
import java.util.List;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MToConfig;
import org.junit.After;
import org.junit.Before;
/**
* Abstract class with convenience methods for testing derby repository.
*/
abstract public class DerbyTestCase {
private static int LATEST_SYSTEM_VERSION = 4;
public static final String DERBY_DRIVER =
"org.apache.derby.jdbc.EmbeddedDriver";
public static final String JDBC_URL =
"jdbc:derby:memory:myDB";
private Connection connection;
@Before
public void setUp() throws Exception {
// Create link to the database
Class.forName(DERBY_DRIVER).newInstance();
connection = DriverManager.getConnection(getStartJdbcUrl());
}
@After
public void tearDown() throws Exception {
// Close active link
if(connection != null) {
connection.close();
}
try {
// Drop in memory database
DriverManager.getConnection(getStopJdbcUrl());
} catch (SQLException ex) {
// Dropping Derby database leads always to exception
}
}
/**
* Create derby schema.
*
* @throws Exception
*/
protected void createSchema(int version) throws Exception {
if (version > 0) {
runQuery(QUERY_CREATE_SCHEMA_SQOOP);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG);
runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_LINK);
runQuery(QUERY_CREATE_TABLE_SQ_JOB);
runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
}
if (version > 1) {
runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER);
runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER);
runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER);
}
if (version > 3) {
runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
for (Direction direction : Direction.values()) {
runQuery(STMT_INSERT_DIRECTION, direction.toString());
}
}
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')");
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
"VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_VERSION + "', '1')");
}
protected void createSchema() throws Exception {
createSchema(LATEST_SYSTEM_VERSION);
}
/**
* Run arbitrary query on derby memory repository.
*
* @param query Query to execute
* @throws Exception
*/
protected void runQuery(String query, Object... args) throws Exception {
PreparedStatement stmt = null;
try {
stmt = getDerbyDatabaseConnection().prepareStatement(query);
for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]);
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long)args[i]);
} else {
stmt.setString(i + 1, args[i].toString());
}
}
stmt.execute();
} finally {
if (stmt != null) {
stmt.close();
}
}
}
/**
* Run single, arbitrary insert query on derby memory repository.
*
* @param query Query to execute
* @return Long id of newly inserted row (-1 if none).
* @throws Exception
*/
protected Long runInsertQuery(String query, Object... args) throws Exception {
PreparedStatement stmt = null;
try {
stmt = getDerbyDatabaseConnection().prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS);
for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]);
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long)args[i]);
} else {
stmt.setString(i + 1, args[i].toString());
}
}
if (!stmt.execute()) {
ResultSet rs = stmt.getGeneratedKeys();
rs.next();
return rs.getLong(1);
}
} finally {
if (stmt != null) {
stmt.close();
}
}
return -1L;
}
protected Connection getDerbyDatabaseConnection() {
return connection;
}
protected String getJdbcUrl() {
return JDBC_URL;
}
protected String getStartJdbcUrl() {
return JDBC_URL + ";create=true";
}
protected String getStopJdbcUrl() {
return JDBC_URL + ";drop=true";
}
protected void loadConnectorAndDriverConfigVersion2() throws Exception {
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
String connector = "1";
// Connector config entries
for(String operation : new String[] {"null", "'IMPORT'", "'EXPORT'"}) {
String type;
if(operation.equals("null")) {
type = "LINK";
} else {
type = "JOB";
}
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES("
+ connector + ", "
+ operation
+ ", 'C1', '"
+ type
+ "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES("
+ connector + ", "
+ operation
+ ", 'C2', '"
+ type
+ "', 1)");
}
// Driver config entries
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) VALUES"
+ "(NULL, 'IMPORT', 'output', 'JOB', 0),"
+ "(NULL, 'IMPORT', 'throttling', 'JOB', 1),"
+ "(NULL, 'EXPORT', 'input', 'JOB', 0),"
+ "(NULL, 'EXPORT', 'throttling', 'JOB', 1),"
+ "(NULL, NULL, 'security', 'LINK', 0)");
// Connector input entries
for(int i = 0; i < 3; i++) {
// First config
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
// Second config
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
}
// Driver input entries.
runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_CONFIG, SQI_INDEX,"
+ " SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_ENUMVALS)"
+" VALUES ('security.maxConnections',11,0,'INTEGER','false',NULL,NULL),"
+ "('input.inputDirectory',9,0,'STRING','false',255,NULL),"
+ "('throttling.extractors',8,0,'INTEGER','false',NULL,NULL),"
+ "('throttling.loaders',8,1,'INTEGER','false',NULL,NULL),"
+ "('output.storageType',7,0,'ENUM','false',NULL,'HDFS'),"
+ "('output.outputFormat',7,1,'ENUM','false',NULL,'TEXT_FILE,SEQUENCE_FILE'),"
+ "('output.compression',7,2,'ENUM','false',NULL,'NONE,DEFAULT,DEFLATE,GZIP,BZIP2,LZO,LZ4,SNAPPY'),"
+ "('output.outputDirectory',7,3,'STRING','false',255,NULL),"
+ "('throttling.extractors',10,0,'INTEGER','false',NULL,NULL),"
+ "('throttling.loaders',10,1,'INTEGER','false',NULL,NULL)");
}
protected void loadConnectorAndDriverConfigVersion4() throws Exception {
Long configId;
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
for (String connector : new String[]{"1"}) {
// Directions
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 2)");
// connector configs
for (String direction : new String[]{null, "1", "2"}) {
String type;
if (direction == null) {
type = "LINK";
} else {
type = "JOB";
}
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ "VALUES(" + configId + ", " + direction + ")");
}
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ "VALUES(" + configId + ", " + direction + ")");
}
}
}
// driver config
for (String type : new String[]{"JOB"}) {
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
}
// Input entries
// Connector LINK config: 0-3
// Connector job (FROM) config: 4-7
// Connector job (TO) config: 8-11
// Driver JOB config: 12-15
for (int i = 0; i < 4; i++) {
// First config
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
// Second config
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
}
}
/**
* Load testing connector and driver config into repository.
*
* @param version system version (2 or 4)
* @throws Exception
*/
protected void loadConnectorAndDriverConfig(int version) throws Exception {
switch(version) {
case 2:
loadConnectorAndDriverConfigVersion2();
break;
case 4:
loadConnectorAndDriverConfigVersion4();
break;
default:
throw new AssertionError("Invalid connector and framework version: " + version);
}
}
protected void loadConnectorLinkConfig() throws Exception {
loadConnectorAndDriverConfig(LATEST_SYSTEM_VERSION);
}
/**
* Load testing link objects into repository.
*
* @param version system version (2 or 4)
* @throws Exception
*/
public void loadLinks(int version) throws Exception {
switch (version) {
case 2:
// Insert two links - CA and CB
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+ "VALUES('CA', 1)");
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+ "VALUES('CB', 1)");
for(String ci : new String[] {"1", "2"}) {
for(String i : new String[] {"1", "3", "13", "15"}) {
runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT"
+ "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
case 4:
// Insert two links - CA and CB
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+ "VALUES('CA', 1)");
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+ "VALUES('CB', 1)");
for (String ci : new String[]{"1", "2"}) {
for (String i : new String[]{"1", "3", "13", "15"}) {
runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT"
+ "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
default:
throw new AssertionError("Invalid connector and framework version: " + version);
}
}
public void loadLinks() throws Exception {
loadLinks(LATEST_SYSTEM_VERSION);
}
/**
* Load testing job objects into repository.
*
* @param version system version (2 or 4)
* @throws Exception
*/
public void loadJobs(int version) throws Exception {
int index = 0;
switch (version) {
case 2:
for(String type : new String[] {"IMPORT", "EXPORT"}) {
for(String name : new String[] {"JA", "JB"} ) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_LINK, SQB_TYPE)"
+ " VALUES('" + name + "', 1, '" + type + "')");
}
}
// Import inputs
for(String ci : new String[] {"1", "2"}) {
for(String i : new String[] {"5", "7", "17", "19"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
// Export inputs
for(String ci : new String[] {"3", "4"}) {
for(String i : new String[] {"9", "11"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
case 4:
for (String name : new String[]{"JA", "JB", "JC", "JD"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)"
+ " VALUES('" + name + index + "', 1, 1)");
}
// Odd IDs inputs have values
for (String ci : new String[]{"1", "2", "3", "4"}) {
for (String i : new String[]{"5", "9", "13"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
for (String i : new String[]{"7", "11", "15"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
default:
throw new AssertionError("Invalid connector and framework version: " + version);
}
}
public void loadJobs() throws Exception {
loadJobs(LATEST_SYSTEM_VERSION);
}
/**
* Add a second connector for testing with multiple connectors
*/
public void addConnector() throws Exception {
// Connector entry
Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
}
/**
* Load testing submissions into the repository.
*
* @throws Exception
*/
public void loadSubmissions() throws Exception {
runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP "
+ "(SQG_NAME) "
+ "VALUES"
+ "('gA'), ('gB')"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER "
+ "(SQR_NAME) "
+ "VALUES"
+ "('cA'), ('cB')"
);
runQuery("INSERT INTO SQOOP.SQ_SUBMISSION"
+ "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE,"
+ " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION,"
+ " SQS_EXCEPTION_TRACE)"
+ "VALUES "
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1',"
+ "NULL, NULL, NULL),"
+ "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2',"
+ " NULL, NULL, NULL),"
+ "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3',"
+ " NULL, NULL, NULL),"
+ "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4',"
+ " NULL, NULL, NULL),"
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5',"
+ " NULL, NULL, NULL)"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION "
+ "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) "
+ "VALUES"
+ "(1, 1, 4, 300)"
);
}
protected MConnector getConnector() {
return getConnector(true, true);
}
protected MConnector getConnector(boolean from, boolean to) {
MFromConfig fromJobForms = null;
MToConfig toJobForms = null;
if (from) {
fromJobForms = getFromConfig();
}
if (to) {
toJobForms = getToConfig();
}
return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test",
getLinkConfig(), fromJobForms, toJobForms);
}
protected MDriver getDriver() {
return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION);
}
protected void fillLink(MLink link) {
List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2");
}
protected void fillJob(MJob job) {
List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getJobConfig(Direction.TO).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getDriverConfig().getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value13");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value15");
}
protected MLinkConfig getLinkConfig() {
return new MLinkConfig(getConfigs());
}
protected MFromConfig getFromConfig() {
return new MFromConfig(getConfigs());
}
protected MToConfig getToConfig() {
return new MToConfig(getConfigs());
}
protected MDriverConfig getDriverConfig() {
return new MDriverConfig(getConfigs());
}
protected List<MConfig> getConfigs() {
List<MConfig> jobConfigs = new LinkedList<MConfig>();
List<MInput<?>> inputs = new LinkedList<MInput<?>>();
MInput input = new MStringInput("I1", false, (short)30);
inputs.add(input);
input = new MMapInput("I2", false);
inputs.add(input);
// adding the from part of the job config
jobConfigs.add(new MConfig("C1", inputs));
// to
inputs = new LinkedList<MInput<?>>();
input = new MStringInput("I3", false, (short)30);
inputs.add(input);
input = new MMapInput("I4", false);
inputs.add(input);
// adding the to part of the job config
jobConfigs.add(new MConfig("C2", inputs));
return jobConfigs;
}
/**
* Find out number of entries in given table.
*
* @param table Table name
* @return Number of rows in the table
* @throws Exception
*/
protected long countForTable(String table) throws Exception {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = getDerbyDatabaseConnection().createStatement();
rs = stmt.executeQuery("SELECT COUNT(*) FROM "+ table);
rs.next();
return rs.getLong(1);
} finally {
if(stmt != null) {
stmt.close();
}
if(rs != null) {
rs.close();
}
}
}
/**
* Assert row count for given table.
*
* @param table Table name
* @param expected Expected number of rows
* @throws Exception
*/
protected void assertCountForTable(String table, long expected)
throws Exception {
long count = countForTable(table);
assertEquals(expected, count);
}
/**
* Printout repository content for advance debugging.
*
* This method is currently unused, but might be helpful in the future, so
* I'm letting it here.
*
* @throws Exception
*/
protected void generateDatabaseState() throws Exception {
for(String tbl : new String[] {"SQ_CONNECTOR", "SQ_CONFIG", "SQ_INPUT",
"SQ_LINK", "SQ_LINK_INPUT", "SQ_JOB", "SQ_JOB_INPUT"}) {
generateTableState("SQOOP." + tbl);
}
}
/**
* Printout one single table.
*
* @param table Table name
* @throws Exception
*/
protected void generateTableState(String table) throws Exception {
PreparedStatement ps = null;
ResultSet rs = null;
ResultSetMetaData rsmt = null;
try {
ps = getDerbyDatabaseConnection().prepareStatement("SELECT * FROM " + table);
rs = ps.executeQuery();
rsmt = rs.getMetaData();
StringBuilder sb = new StringBuilder();
System.out.println("Table " + table + ":");
for(int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rsmt.getColumnName(i)).append(" ");
}
sb.append("|");
System.out.println(sb.toString());
while(rs.next()) {
sb = new StringBuilder();
for(int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rs.getString(i)).append(" ");
}
sb.append("|");
System.out.println(sb.toString());
}
System.out.println("");
} finally {
if(rs != null) {
rs.close();
}
if(ps != null) {
ps.close();
}
}
}
}