blob: 8846f65cddde4627fa52cb130d24f931eeba53bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.oracle.util;
import java.net.URL;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.sqoop.manager.oracle.OraOopTestCase;
import org.apache.sqoop.manager.oracle.OracleUtils;
/**
* Class to load an Oracle table with data based on configuration file.
*/
public final class OracleData {
private OracleData() {
}
enum KeyType {
PRIMARY, UNIQUE
}
private static ClassLoader classLoader;
static {
classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = OraOopTestCase.class.getClassLoader();
}
}
private static String getColumnList(OracleTableDefinition tableDefinition) {
StringBuilder result = new StringBuilder();
String delim = "";
for (OracleDataDefinition column : tableDefinition.getColumnList()) {
result.append(delim).append(OracleUtils.escapeIdentifier(column.getColumnName())).append(" ").append(
column.getDataType());
delim = ",\n";
}
return result.toString();
}
private static String
getDataExpression(List<OracleDataDefinition> columnList) {
StringBuilder result = new StringBuilder();
for (OracleDataDefinition column : columnList) {
result.append("l_ret_rec.").append(OracleUtils.escapeIdentifier(column.getColumnName())).append(" := ")
.append(column.getDataExpression()).append(";\n");
}
return result.toString();
}
private static void createPackageSpec(Connection conn,
OracleTableDefinition tableDefinition) throws Exception {
String pkgSql =
IOUtils.toString(classLoader.getResource(
"oraoop/pkg_tst_product_gen.psk").openStream());
pkgSql =
pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition));
pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
PreparedStatement stmt = conn.prepareStatement(pkgSql);
stmt.execute();
}
private static void createPackageBody(Connection conn,
OracleTableDefinition tableDefinition) throws Exception {
String pkgSql =
IOUtils.toString(classLoader.getResource(
"oraoop/pkg_tst_product_gen.pbk").openStream());
String columnList = getColumnList(tableDefinition);
if (tableDefinition.isIndexOrganizedTable()) {
columnList += "\n," + getKeyString(KeyType.PRIMARY, tableDefinition);
}
pkgSql =
pkgSql.replaceAll("\\$COLUMN_LIST", columnList);
pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
pkgSql =
pkgSql.replaceAll("\\$DATA_EXPRESSION_LIST",
getDataExpression(tableDefinition.getColumnList()));
pkgSql =
pkgSql.replaceAll("\\$TABLE_ORGANIZATION_CLAUSE",
tableDefinition.isIndexOrganizedTable()
? "ORGANIZATION INDEX OVERFLOW NOLOGGING" : "");
pkgSql =
pkgSql.replaceAll("\\$PARTITION_CLAUSE", tableDefinition
.getPartitionClause());
PreparedStatement stmt = conn.prepareStatement(pkgSql);
stmt.execute();
}
private static String getKeyColumns(KeyType keyType,
OracleTableDefinition tableDefinition) {
String result = null;
List<String> columns = null;
switch (keyType) {
case PRIMARY:
columns = tableDefinition.getPrimaryKeyColumns();
break;
case UNIQUE:
columns = tableDefinition.getUniqueKeyColumns();
break;
default:
throw new RuntimeException("Missing key type.");
}
if (columns != null && columns.size() > 0) {
StringBuilder keyColumnList = new StringBuilder();
String delim = "";
for (String column : columns) {
keyColumnList.append(delim).append(OracleUtils.escapeIdentifier(column));
delim = ",";
}
result = keyColumnList.toString();
}
return result;
}
private static String getKeyString(KeyType keyType,
OracleTableDefinition tableDefinition) {
String keySql = null;
String keyColumnList = getKeyColumns(keyType, tableDefinition);
if (keyColumnList!=null) {
keySql = "constraint \"$TABLE_NAME_"
+ ((keyType == KeyType.PRIMARY) ? "PK\" primary key"
: "UK\" unique") + "($PK_COLUMN_LIST) ";
keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
keySql =
keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
}
return keySql;
}
private static void createKey(Connection conn, KeyType keyType,
OracleTableDefinition tableDefinition) throws Exception {
String keySql = getKeyString(keyType, tableDefinition);
String keyColumnList = getKeyColumns(keyType, tableDefinition);
if (keySql!=null) {
keySql = "alter table \"$TABLE_NAME\" add " + keySql
+ " using index (create unique index \"$TABLE_NAME_"
+ ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"")
+ " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)";
keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
keySql =
keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
PreparedStatement stmt = conn.prepareStatement(keySql);
stmt.execute();
}
}
public static int getParallelProcesses(Connection conn) throws Exception {
PreparedStatement stmt =
conn.prepareStatement("SELECT cc.value value"
+ "\n"
+ "FROM"
+ "\n"
+ " (SELECT to_number(value) value"
+ "\n"
+ " FROM v$parameter"
+ "\n"
+ " WHERE name='parallel_max_servers'"
+ "\n"
+ " ) pms,"
+ "\n"
+ " (SELECT to_number(value) value"
+ "\n"
+ " FROM v$parameter"
+ "\n"
+ " WHERE name='parallel_threads_per_cpu'"
+ "\n"
+ " ) ptpc,"
+ "\n"
+ " (SELECT to_number(value) value FROM v$parameter "
+ " WHERE name='cpu_count'"
+ "\n" + " ) cc");
ResultSet res = stmt.executeQuery();
res.next();
return res.getInt(1);
}
public static void createTable(Connection conn,
OracleTableDefinition tableDefinition, int parallelDegree,
int rowsPerSlave) throws Exception {
createPackageSpec(conn, tableDefinition);
createPackageBody(conn, tableDefinition);
CallableStatement procStmt =
conn.prepareCall("begin \"PKG_ODG_" + tableDefinition.getTableName()
+ "\".prc_load_table(?,?); end;");
procStmt.setInt(1, parallelDegree);
procStmt.setInt(2, rowsPerSlave);
procStmt.execute();
if (!tableDefinition.isIndexOrganizedTable()) {
createKey(conn, KeyType.PRIMARY, tableDefinition);
}
createKey(conn, KeyType.UNIQUE, tableDefinition);
}
public static void createTable(Connection conn, String fileName,
int parallelDegree, int rowsPerSlave) throws Exception {
URL file = classLoader.getResource("oraoop/" + fileName);
OracleTableDefinition tableDefinition = new OracleTableDefinition(file);
createTable(conn, tableDefinition, parallelDegree, rowsPerSlave);
}
}