blob: a0ab604ff2bd47a278c9a23b1768941470bbc1b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.oracle;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.SqoopOptions.IncrementalMode;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ManagerFactory;
import org.apache.sqoop.metastore.JobData;
import org.apache.sqoop.manager.OracleManager;
import org.apache.sqoop.manager.oracle.OraOopOutputFormatUpdate.UpdateMode;
import org.apache.sqoop.manager.oracle.OraOopUtilities.
JdbcOracleThinConnectionParsingError;
import static org.apache.sqoop.mapreduce.JobBase.HADOOP_MAP_TASK_MAX_ATTEMTPS;
/**
* OraOop manager - if OraOop cannot be used it should fall back to the default
* OracleManager.
*
* To increase the amount of heap memory available to the mappers:
* -Dmapred.child.java.opts=-Xmx4000M
* To prevent failed mapper tasks from being reattempted:
* -Dmapred.map.max.attempts=1
*/
public class OraOopManagerFactory extends ManagerFactory {
@SuppressWarnings("unused")
private static final OraOopLog ORAOOP_LOG = OraOopLogFactory
.getLog("org.apache.sqoop.manager.oracle");
private static final OraOopLog LOG = OraOopLogFactory
.getLog(OraOopManagerFactory.class.getName());
static {
Configuration
.addDefaultResource(OraOopConstants.ORAOOP_SITE_TEMPLATE_FILENAME);
Configuration.addDefaultResource(OraOopConstants.ORAOOP_SITE_FILENAME);
}
@Override
public ConnManager accept(JobData jobData) {
OraOopUtilities.enableDebugLoggingIfRequired(jobData.getSqoopOptions()
.getConf());
LOG.debug(String.format("%s can be called by Sqoop!",
OraOopConstants.ORAOOP_PRODUCT_NAME));
ConnManager result = null;
if (jobData != null) {
SqoopOptions sqoopOptions = jobData.getSqoopOptions();
String connectString = sqoopOptions.getConnectString();
if (connectString != null
&& connectString.toLowerCase().trim().startsWith("jdbc:oracle")) {
if (!isOraOopEnabled(sqoopOptions)) {
return result;
}
OraOopConnManager oraOopConnManager = null;
OraOopConstants.Sqoop.Tool jobType = getSqoopJobType(jobData);
OraOopUtilities.rememberSqoopJobType(jobType, jobData.getSqoopOptions()
.getConf());
List<OraOopLogMessage> messagesToDisplayAfterWelcome =
new ArrayList<OraOopLogMessage>();
switch (jobType) {
case IMPORT:
if (isNumberOfImportMappersOkay(sqoopOptions)
&& !isSqoopImportIncremental(jobData)
&& isSqoopImportJobTableBased(sqoopOptions)) {
// At this stage, the Sqoop import job appears to be one we're
// interested in accepting. We now need to connect to
// the Oracle database to perform more tests...
oraOopConnManager = new OraOopConnManager(sqoopOptions);
try {
Connection connection = oraOopConnManager.getConnection();
if (isSqoopTableAnOracleTable(connection, sqoopOptions
.getUsername(),
oraOopConnManager.getOracleTableContext())) {
// OraOop will not accept responsibility for an Index
// Organized Table (IOT)...
if (!isSqoopTableAnIndexOrganizedTable(connection,
oraOopConnManager.getOracleTableContext())) {
result = oraOopConnManager; // <- OraOop accepts
// responsibility for this Sqoop
// job!
} else {
OraOopConstants.OraOopOracleDataChunkMethod method =
OraOopUtilities.getOraOopOracleDataChunkMethod(
sqoopOptions.getConf());
if (method == OraOopConstants.
OraOopOracleDataChunkMethod.PARTITION) {
result = oraOopConnManager;
} else {
LOG.info(String.format("%s will not process this Sqoop"
+ " connection, as the Oracle table %s is an"
+ " index-organized table. If the table is"
+ " partitioned, set "
+ OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD
+ " to "
+ OraOopConstants.OraOopOracleDataChunkMethod.PARTITION
+ ".",
OraOopConstants.ORAOOP_PRODUCT_NAME,
oraOopConnManager.getOracleTableContext().toString()));
}
}
}
} catch (SQLException ex) {
throw new RuntimeException(String.format(
"Unable to connect to the Oracle database at %s\n"
+ "Error:%s", sqoopOptions.getConnectString(), ex
.getMessage()), ex);
}
}
break;
case EXPORT:
if (isNumberOfExportMappersOkay(sqoopOptions)) {
// At this stage, the Sqoop export job appears to be one we're
// interested in accepting. We now need to connect to
// the Oracle database to perform more tests...
oraOopConnManager = new OraOopConnManager(sqoopOptions);
Connection connection = null;
try {
connection = oraOopConnManager.getConnection();
} catch (SQLException ex) {
throw new RuntimeException(String.format(
"Unable to connect to the Oracle database at %s\n"
+ "Error:%s", sqoopOptions.getConnectString(), ex
.getMessage()), ex);
}
try {
createAnyRequiredOracleObjects(sqoopOptions, connection,
oraOopConnManager, messagesToDisplayAfterWelcome);
if (isSqoopTableAnOracleTable(connection, sqoopOptions
.getUsername(),
oraOopConnManager.getOracleTableContext())) {
result = oraOopConnManager; // <- OraOop accepts
// responsibility for this Sqoop
// job!
}
} catch (SQLException ex) {
LOG.error(OraOopUtilities.getFullExceptionMessage(ex));
}
}
break;
default:
// OraOop doesn't know how to handle other types of jobs - so won't
// accept them.
break;
}
// If OraOop has accepted this Sqoop job...
if (result != null) {
showUserTheOraOopWelcomeMessage();
for (OraOopLogMessage message : messagesToDisplayAfterWelcome) {
message.log(LOG);
}
// By the time we get into getSplits(), the number of mappers
// stored in the config can be either 4 or 1 - so it seems
// a bit unreliable. We'll use our own property name to ensure
// getSplits() gets the correct value...
sqoopOptions.getConf().setInt(
OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS,
sqoopOptions.getNumMappers());
// Generate the "action" name that we'll assign to our Oracle sessions
// so that the user knows which Oracle sessions belong to OraOop...
sqoopOptions.getConf().set(
OraOopConstants.ORACLE_SESSION_ACTION_NAME,
getOracleSessionActionName(jobData));
OraOopUtilities.appendJavaSecurityEgd(sqoopOptions.getConf());
// Get the Oracle database version...
try {
OracleVersion oracleVersion =
OraOopOracleQueries.getOracleVersion(result.getConnection());
LOG.info(String.format("Oracle Database version: %s",
oracleVersion.getBanner()));
sqoopOptions.getConf().setInt(
OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MAJOR,
oracleVersion.getMajor());
sqoopOptions.getConf().setInt(
OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MINOR,
oracleVersion.getMinor());
} catch (SQLException ex) {
LOG.error("Unable to obtain the Oracle database version.", ex);
}
try {
if (sqoopOptions.getConf().getBoolean(
OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ, false)) {
long scn =
sqoopOptions.getConf().getLong(
OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, 0);
if (scn == 0) {
scn = OraOopOracleQueries.getCurrentScn(result.getConnection());
}
sqoopOptions.getConf().setLong(
OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, scn);
LOG.info("Performing a consistent read using SCN: " + scn);
}
} catch (SQLException ex) {
throw new RuntimeException("Unable to determine SCN of database.",
ex);
}
// Generate the JDBC URLs to be used by each mapper...
setMapperConnectionDetails(oraOopConnManager, jobData);
// Show the user the Oracle command that can be used to kill this
// OraOop
// job via Oracle...
showUserTheOracleCommandToKillOraOop(sqoopOptions);
}
}
}
return result;
}
private void setMapperConnectionDetails(OraOopConnManager oraOopConnManager,
JobData jobData) {
// Ensure we have a connection to the database...
Connection connection = null;
try {
connection = oraOopConnManager.getConnection();
} catch (SQLException ex) {
throw new RuntimeException(String.format(
"Unable to connect to the Oracle database at %s\n" + "Error:%s",
jobData.getSqoopOptions().getConnectString(), ex.getMessage()));
}
// Query v$active_instances to get a list of all instances in the Oracle RAC
// (assuming this *could* be a RAC)...
List<OracleActiveInstance> activeInstances = null;
try {
activeInstances =
OraOopOracleQueries.getOracleActiveInstances(connection);
} catch (SQLException ex) {
throw new RuntimeException(
"An error was encountered when attempting to determine the "
+ "configuration of the Oracle RAC.",
ex);
}
if (activeInstances == null) {
LOG.info("This Oracle database is not a RAC.");
} else {
LOG.info("This Oracle database is a RAC.");
}
// Is dynamic JDBC URL generation disabled?...
if (OraOopUtilities.oracleJdbcUrlGenerationDisabled(jobData
.getSqoopOptions().getConf())) {
LOG.info(String
.format(
"%s will not use dynamically generated JDBC URLs - this feature "
+ "has been disabled.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
return;
}
boolean generateRacBasedJdbcUrls = false;
// Decide whether this is a multi-instance RAC, and whether we need to do
// anything more...
if (activeInstances != null) {
generateRacBasedJdbcUrls = true;
if (activeInstances.size() < OraOopUtilities
.getMinNumberOfOracleRacActiveInstancesForDynamicJdbcUrlUse(jobData
.getSqoopOptions().getConf())) {
LOG.info(String.format(
"There are only %d active instances in the Oracle RAC. "
+ "%s will not bother utilizing dynamically generated JDBC URLs.",
activeInstances.size(), OraOopConstants.ORAOOP_PRODUCT_NAME));
generateRacBasedJdbcUrls = false;
}
}
// E.g. jdbc:oracle:thin:@localhost.localdomain:1521:orcl
String jdbcConnectStr = jobData.getSqoopOptions().getConnectString();
// Parse the JDBC URL to obtain the port number for the TNS listener...
String jdbcHost = "";
int jdbcPort = 0;
String jdbcSid = "";
String jdbcService = "";
String jdbcTnsName = "";
try {
OraOopJdbcUrl oraOopJdbcUrl = new OraOopJdbcUrl(jdbcConnectStr);
OraOopUtilities.JdbcOracleThinConnection jdbcConnection =
oraOopJdbcUrl.parseJdbcOracleThinConnectionString();
jdbcHost = jdbcConnection.getHost();
jdbcPort = jdbcConnection.getPort();
jdbcSid = jdbcConnection.getSid();
jdbcService = jdbcConnection.getService();
jdbcTnsName = jdbcConnection.getTnsName();
} catch (JdbcOracleThinConnectionParsingError ex) {
LOG.info(String.format(
"Unable to parse the JDBC connection URL \"%s\" as a connection "
+ "that uses the Oracle 'thin' JDBC driver.\n"
+ "This problem prevents %s from being able to dynamically generate "
+ "JDBC URLs that specify 'dedicated server connections' or spread "
+ "mapper sessions across multiple Oracle instances.\n"
+ "If the JDBC driver-type is 'OCI' (instead of 'thin'), then "
+ "load-balancing should be appropriately managed automatically.",
jdbcConnectStr, OraOopConstants.ORAOOP_PRODUCT_NAME, ex));
return;
}
if (generateRacBasedJdbcUrls) {
// Retrieve the Oracle service name to use when connecting to the RAC...
String oracleServiceName =
OraOopUtilities.getOracleServiceName(jobData.getSqoopOptions()
.getConf());
// Generate JDBC URLs for each of the mappers...
if (!oracleServiceName.isEmpty()) {
if (!generateRacJdbcConnectionUrlsByServiceName(jdbcHost, jdbcPort,
oracleServiceName, jobData)) {
throw new RuntimeException(String.format(
"Unable to connect to the Oracle database at %s "
+ "via the service name \"%s\".", jobData.getSqoopOptions()
.getConnectString(), oracleServiceName));
}
} else {
generateJdbcConnectionUrlsByActiveInstance(activeInstances, jdbcPort,
jobData);
}
} else {
generateJdbcConnectionUrlsByTnsnameSidOrService(jdbcHost, jdbcPort,
jdbcSid, jdbcService, jdbcTnsName, jobData);
}
}
private void generateJdbcConnectionUrlsByTnsnameSidOrService(String hostName,
int port, String sid, String serviceName, String tnsName, JobData jobData) {
String jdbcUrl = null;
if (tnsName != null && !tnsName.isEmpty()) {
jdbcUrl = OraOopUtilities.generateOracleTnsNameJdbcUrl(tnsName);
} else if (sid != null && !sid.isEmpty()) {
jdbcUrl = OraOopUtilities.generateOracleSidJdbcUrl(hostName, port, sid);
} else {
jdbcUrl =
OraOopUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
serviceName);
}
// Now store these connection strings in such a way that each mapper knows
// which one to use...
for (int idxMapper = 0; idxMapper < jobData.getSqoopOptions()
.getNumMappers(); idxMapper++) {
storeJdbcUrlForMapper(idxMapper, jdbcUrl, jobData);
}
}
private boolean generateRacJdbcConnectionUrlsByServiceName(String hostName,
int port, String serviceName, JobData jobData) {
boolean result = false;
String jdbcUrl =
OraOopUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
serviceName);
if (testDynamicallyGeneratedOracleRacInstanceConnection(jdbcUrl, jobData
.getSqoopOptions().getUsername(), jobData.getSqoopOptions()
.getPassword(), jobData.getSqoopOptions().getConnectionParams()
, false // <- ShowInstanceSysTimestamp
, "" // <- instanceDescription
)) {
LOG.info(String.format(
"%s will load-balance sessions across the Oracle RAC instances "
+ "by connecting each mapper to the Oracle Service \"%s\".",
OraOopConstants.ORAOOP_PRODUCT_NAME, serviceName));
// Now store these connection strings in such a way that each mapper knows
// which one to use...
for (int idxMapper = 0; idxMapper < jobData.getSqoopOptions()
.getNumMappers(); idxMapper++) {
storeJdbcUrlForMapper(idxMapper, jdbcUrl, jobData);
}
result = true;
}
return result;
}
private void
generateJdbcConnectionUrlsByActiveInstance(
List<OracleActiveInstance> activeInstances, int jdbcPort,
JobData jobData) {
// Generate JDBC URLs for each of the instances in the RAC...
ArrayList<OraOopUtilities.JdbcOracleThinConnection>
jdbcOracleActiveThinConnections =
new ArrayList<OraOopUtilities.JdbcOracleThinConnection>(
activeInstances.size());
for (OracleActiveInstance activeInstance : activeInstances) {
OraOopUtilities.JdbcOracleThinConnection
jdbcActiveInstanceThinConnection =
new OraOopUtilities.JdbcOracleThinConnection(
activeInstance.getHostName(),
jdbcPort, activeInstance.getInstanceName(), "", "");
if (testDynamicallyGeneratedOracleRacInstanceConnection(
jdbcActiveInstanceThinConnection.toString(), jobData
.getSqoopOptions().getUsername(), jobData.getSqoopOptions()
.getPassword(), jobData.getSqoopOptions().getConnectionParams(),
true, activeInstance.getInstanceName())) {
jdbcOracleActiveThinConnections.add(jdbcActiveInstanceThinConnection);
}
}
// If there are multiple JDBC URLs that work okay for the RAC, then we'll
// make use of them...
if (jdbcOracleActiveThinConnections.size() < OraOopUtilities
.getMinNumberOfOracleRacActiveInstancesForDynamicJdbcUrlUse(jobData
.getSqoopOptions().getConf())) {
LOG.info(String
.format(
"%s will not attempt to load-balance sessions across instances "
+ "of an Oracle RAC - as multiple JDBC URLs to the "
+ "Oracle RAC could not be dynamically generated.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
return;
} else {
StringBuilder msg = new StringBuilder();
msg.append(String
.format(
"%s will load-balance sessions across the following instances of"
+ "the Oracle RAC:\n",
OraOopConstants.ORAOOP_PRODUCT_NAME));
for (OraOopUtilities.JdbcOracleThinConnection thinConnection
: jdbcOracleActiveThinConnections) {
msg.append(String.format("\tInstance: %s \t URL: %s\n",
thinConnection.getSid(), thinConnection.toString()));
}
LOG.info(msg.toString());
}
// Now store these connection strings in such a way that each mapper knows
// which one to use...
int racInstanceIdx = 0;
OraOopUtilities.JdbcOracleThinConnection thinUrl;
for (int idxMapper = 0; idxMapper < jobData.getSqoopOptions()
.getNumMappers(); idxMapper++) {
if (racInstanceIdx > jdbcOracleActiveThinConnections.size() - 1) {
racInstanceIdx = 0;
}
thinUrl = jdbcOracleActiveThinConnections.get(racInstanceIdx);
racInstanceIdx++;
storeJdbcUrlForMapper(idxMapper, thinUrl.toString(), jobData);
}
}
private boolean testDynamicallyGeneratedOracleRacInstanceConnection(
String url, String userName, String password, Properties additionalProps,
boolean showInstanceSysTimestamp, String instanceDescription) {
boolean result = false;
// Test the connection...
try {
Connection testConnection =
OracleConnectionFactory.createOracleJdbcConnection(
OraOopConstants.ORACLE_JDBC_DRIVER_CLASS, url, userName,
password, additionalProps);
// Show the system time on each instance...
if (showInstanceSysTimestamp) {
LOG.info(String.format("\tDatabase time on %s is %s",
instanceDescription, OraOopOracleQueries
.getSysTimeStamp(testConnection)));
}
testConnection.close();
result = true;
} catch (SQLException ex) {
LOG.warn(
String
.format(
"The dynamically generated JDBC URL \"%s\" was unable to "
+ "connect to an instance in the Oracle RAC.",
url), ex);
}
return result;
}
private void storeJdbcUrlForMapper(int mapperIdx, String jdbcUrl,
JobData jobData) {
// Now store these connection strings in such a way that each mapper knows
// which one to use...
Configuration conf = jobData.getSqoopOptions().getConf();
String mapperJdbcUrlPropertyName =
OraOopUtilities.getMapperJdbcUrlPropertyName(mapperIdx, conf);
LOG.debug("Setting mapper url " + mapperJdbcUrlPropertyName + " = "
+ jdbcUrl);
conf.set(mapperJdbcUrlPropertyName, jdbcUrl);
}
private boolean isOraOopEnabled(SqoopOptions sqoopOptions) {
String oraOopDisabled =
sqoopOptions.getConf().get(OraOopConstants.ORAOOP_DISABLED, "false")
.toLowerCase();
boolean oraOopIsDisabled =
oraOopDisabled.equalsIgnoreCase("true")
|| oraOopDisabled.equalsIgnoreCase("yes")
|| oraOopDisabled.equalsIgnoreCase("y")
|| oraOopDisabled.equalsIgnoreCase("1");
oraOopIsDisabled = oraOopIsDisabled || !sqoopOptions.isDirect();
if (oraOopIsDisabled) {
LOG.info(String.format("%s is disabled.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
}
return !oraOopIsDisabled;
}
private OraOopConstants.Sqoop.Tool getSqoopJobType(JobData jobData) {
OraOopConstants.Sqoop.Tool result = OraOopConstants.Sqoop.Tool.UNKNOWN;
String sqoopToolName = getSqoopToolName(jobData).toUpperCase().trim();
try {
result = OraOopConstants.Sqoop.Tool.valueOf(sqoopToolName);
} catch (IllegalArgumentException ex) {
LOG.debug(String.format(
"The Sqoop tool name \"%s\" is not supported by OraOop",
sqoopToolName), ex);
}
return result;
}
private boolean isNumberOfImportMappersOkay(SqoopOptions sqoopOptions) {
// Check whether there are enough mappers for OraOop to be of benefit...
boolean result =
(sqoopOptions.getNumMappers() >= OraOopUtilities
.getMinNumberOfImportMappersAcceptedByOraOop(sqoopOptions.getConf()));
if (!result) {
LOG.info(String.format(
"%s will not process this sqoop connection, as an insufficient number "
+ "of mappers are being used.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
}
return result;
}
private boolean isNumberOfExportMappersOkay(SqoopOptions sqoopOptions) {
// Check whether there are enough mappers for OraOop to be of benefit...
boolean result =
(sqoopOptions.getNumMappers() >= OraOopUtilities
.getMinNumberOfExportMappersAcceptedByOraOop(sqoopOptions.getConf()));
if (!result) {
LOG.info(String.format(
"%s will not process this sqoop connection, as an insufficient number "
+ "of mappers are being used.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
}
return result;
}
private boolean isSqoopImportJobTableBased(SqoopOptions sqoopOptions) {
String tableName = sqoopOptions.getTableName();
return (tableName != null && !tableName.isEmpty());
}
private boolean isSqoopTableAnOracleTable(Connection connection,
String connectionUserName, OracleTable tableContext) {
String oracleObjectType;
try {
// Find the table via dba_tables...
OracleTable oracleTable =
OraOopOracleQueries.getTable(connection, tableContext.getSchema(),
tableContext.getName());
if (oracleTable != null) {
return true;
}
// If we could not find the table via dba_tables, then try and determine
// what type of database object the
// user was referring to. Perhaps they've specified the name of a view?...
oracleObjectType =
OraOopOracleQueries.getOracleObjectType(connection, tableContext);
if (oracleObjectType == null) {
LOG.info(String.format(
"%1$s will not process this Sqoop connection, "
+ "as the Oracle user %2$s does not own a table named %3$s.\n"
+ "\tPlease prefix the table name with the owner.\n "
+ "\tNote: You may need to double-quote the owner and/or table name."
+ "\n\tE.g. sqoop ... --username %4$s --table %2$s.%3$s\n",
OraOopConstants.ORAOOP_PRODUCT_NAME, tableContext.getSchema(),
tableContext.getName(), connectionUserName));
return false;
}
} catch (SQLException ex) {
LOG.warn(String.format(
"Unable to determine the Oracle-type of the object named %s owned by "
+ "%s.\nError:\n" + "%s", tableContext.getName(), tableContext
.getSchema(), ex.getMessage()));
// In the absence of conflicting information, let's assume the object is
// actually a table...
return true;
}
boolean result =
oracleObjectType
.equalsIgnoreCase(OraOopConstants.Oracle.OBJECT_TYPE_TABLE);
if (!result) {
LOG.info(String.format("%s will not process this sqoop connection, "
+ "as %s is not an Oracle table, it's a %s.",
OraOopConstants.ORAOOP_PRODUCT_NAME, tableContext.toString(),
oracleObjectType));
}
return result;
}
private boolean isSqoopTableAnIndexOrganizedTable(Connection connection,
OracleTable tableContext) {
boolean result = false;
try {
result =
OraOopOracleQueries.isTableAnIndexOrganizedTable(connection,
tableContext);
return result;
} catch (SQLException ex) {
LOG.warn(String.format(
"Unable to determine whether the Oracle table %s is an index-organized"
+ " table.\nError:\n" + "%s", tableContext.toString(), ex.getMessage()));
}
return result;
}
private String getSqoopToolName(JobData jobData) {
return jobData.getSqoopTool().getToolName();
}
private String getOracleSessionActionName(JobData jobData) {
// This method has been written assuming that:
// (1) OraOop only processes Sqoop "import" and "export" jobs; and
// (2) a table will be used during the import/export (not a query).
if (getSqoopJobType(jobData) != OraOopConstants.Sqoop.Tool.IMPORT
&& getSqoopJobType(jobData) != OraOopConstants.Sqoop.Tool.EXPORT) {
throw new UnsupportedOperationException(String.format(
"%s needs to be updated to cope " + "with Sqoop jobs of type %s.",
OraOopUtilities.getCurrentMethodName(), getSqoopToolName(jobData)));
}
String timeStr =
(new SimpleDateFormat("yyyyMMddHHmmsszzz")).format(new Date());
String result = String.format("%s %s", getSqoopToolName(jobData), timeStr);
// NOTE: The "action" column of v$session is only a 32 character column.
// Therefore we need to ensure that the string returned by this
// method does not exceed 32 characters...
if (result.length() > 32) {
result = result.substring(0, 32).trim();
}
return result;
}
private boolean isSqoopImportIncremental(JobData jobData) {
boolean result =
jobData.getSqoopOptions().getIncrementalMode() != IncrementalMode.None;
if (result) {
LOG.info(String.format("%1$s will not process this sqoop connection, "
+ "as incremental mode is not supported by %1$s.",
OraOopConstants.ORAOOP_PRODUCT_NAME));
}
return result;
}
private void showUserTheOraOopWelcomeMessage() {
String msg1 =
String.format("Using %s", OraOopConstants.ORAOOP_PRODUCT_NAME);
int longestMessage = msg1.length();
msg1 = OraOopUtilities.padRight(msg1, longestMessage);
char[] asterisks = new char[longestMessage + 8];
Arrays.fill(asterisks, '*');
String msg =
String.format("\n" + "%1$s\n" + "*** %2$s ***\n" + "%1$s", new String(
asterisks), msg1);
LOG.info(msg);
}
private void showUserTheOracleCommandToKillOraOop(SqoopOptions sqoopOptions) {
int taskAttempts =
sqoopOptions.getConf().getInt(HADOOP_MAP_TASK_MAX_ATTEMTPS, -1);
// If killing the Oracle session if futile - because the job will be
// reattempted, then don't
// bother telling the user about this feature...
if (taskAttempts != 1) {
return;
}
String moduleName = OraOopConstants.ORACLE_SESSION_MODULE_NAME;
String actionName =
sqoopOptions.getConf().get(OraOopConstants.ORACLE_SESSION_ACTION_NAME);
String msg = String.format(
"\nNote: This %s job can be killed via Oracle by executing the "
+ "following statement:\n\tbegin\n"
+ "\t\tfor row in (select sid,serial# from v$session where module='%s' "
+ "and action='%s') loop\n"
+ "\t\t\texecute immediate 'alter system kill session ''' || row.sid || "
+ "',' || row.serial# || '''';\n"
+ "\t\tend loop;\n" + "\tend;",
OraOopConstants.ORAOOP_PRODUCT_NAME, moduleName, actionName);
LOG.info(msg);
}
private void createAnyRequiredOracleObjects(SqoopOptions sqoopOptions,
Connection connection, OraOopConnManager oraOopConnManager,
List<OraOopLogMessage> messagesToDisplayAfterWelcome) throws SQLException {
Configuration conf = sqoopOptions.getConf();
// The SYSDATE on the Oracle database will be used as the partition value
// for this export job...
Object sysDateTime = OraOopOracleQueries.getSysDate(connection);
String sysDateStr =
OraOopOracleQueries.oraDATEToString(sysDateTime, "yyyy-mm-dd hh24:mi:ss");
OraOopUtilities.rememberOracleDateTime(conf,
OraOopConstants.ORAOOP_JOB_SYSDATE, sysDateStr);
checkForOldOraOopTemporaryOracleTables(connection, sysDateTime,
OraOopOracleQueries.getCurrentSchema(connection),
messagesToDisplayAfterWelcome);
// Store the actual partition value, so the N mappers know what value to
// insert...
String partitionValue =
OraOopOracleQueries.oraDATEToString(sysDateTime,
OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
conf.set(OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE,
partitionValue);
// Generate the (22 character) partition name...
String partitionName =
OraOopUtilities
.createExportTablePartitionNameFromOracleTimestamp(sysDateTime);
int numMappers = sqoopOptions.getNumMappers();
String exportTableTemplate =
conf.get(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE, "");
String user = sqoopOptions.getUsername();
if (user == null) {
user = OracleManager.getSessionUser(connection);
}
OracleTable templateTableContext =
OraOopUtilities.decodeOracleTableName(user, exportTableTemplate);
boolean noLoggingOnNewTable =
conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING,
false);
String updateKeyCol = sqoopOptions.getUpdateKeyCol();
/* =========================== */
/* VALIDATION OF INPUTS */
/* =========================== */
if (updateKeyCol == null || updateKeyCol.isEmpty()) {
// We're performing an "insert" export, not an "update" export.
// Check that the "oraoop.export.merge" property has not been specified,
// as this would be
// an invalid scenario...
if (OraOopUtilities.getExportUpdateMode(conf) == UpdateMode.Merge) {
throw new RuntimeException(String.format(
"\n\nThe option \"%s\" can only be used if \"%s\" is "
+ "also used.\n", OraOopConstants.ORAOOP_EXPORT_MERGE,
"--update-key"));
}
}
if (OraOopUtilities
.userWantsToCreatePartitionedExportTableFromTemplate(conf)
|| OraOopUtilities
.userWantsToCreateNonPartitionedExportTableFromTemplate(conf)) {
// OraOop will create the export table.
if (oraOopConnManager.getOracleTableContext().getName().length()
> OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
String msg =
String.format(
"The Oracle table name \"%s\" is longer than %d characters.\n"
+ "Oracle will not allow a table with this name to be created.",
oraOopConnManager.getOracleTableContext().getName(),
OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH);
throw new RuntimeException(msg);
}
if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
// We're performing an "update" export, not an "insert" export.
// Check whether the user is attempting an "update" (i.e. a non-merge).
// If so, they're
// asking to only UPDATE rows in a (about to be created) (empty) table
// that contains no rows.
// This will be a waste of time, as we'd be attempting to perform UPDATE
// operations against a
// table with no rows in it...
UpdateMode updateMode = OraOopUtilities.getExportUpdateMode(conf);
if (updateMode == UpdateMode.Update) {
throw new RuntimeException(String.format(
"\n\nCombining the option \"%s\" with the option \"%s=false\" is "
+ "nonsensical, as this would create an "
+ "empty table and then perform "
+ "a lot of work that results in a table containing no rows.\n",
OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE,
OraOopConstants.ORAOOP_EXPORT_MERGE));
}
}
// Check that the specified template table actually exists and is a
// table...
String templateTableObjectType =
OraOopOracleQueries.getOracleObjectType(connection,
templateTableContext);
if (templateTableObjectType == null) {
throw new RuntimeException(String.format(
"The specified Oracle template table \"%s\" does not exist.",
templateTableContext.toString()));
}
if (!templateTableObjectType
.equalsIgnoreCase(OraOopConstants.Oracle.OBJECT_TYPE_TABLE)) {
throw new RuntimeException(
String.format(
"The specified Oracle template table \"%s\" is not an "
+ "Oracle table, it's a %s.",
templateTableContext.toString(), templateTableObjectType));
}
if (conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_DROP,
false)) {
OraOopOracleQueries.dropTable(connection, oraOopConnManager
.getOracleTableContext());
}
// Check that there is no existing database object with the same name of
// the table to be created...
String newTableObjectType =
OraOopOracleQueries.getOracleObjectType(connection, oraOopConnManager
.getOracleTableContext());
if (newTableObjectType != null) {
throw new RuntimeException(
String.format(
"%s cannot create a new Oracle table named %s as a \"%s\" "
+ "with this name already exists.",
OraOopConstants.ORAOOP_PRODUCT_NAME, oraOopConnManager
.getOracleTableContext().toString(), newTableObjectType));
}
} else {
// The export table already exists.
if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
// We're performing an "update" export, not an "insert" export.
// Check that there exists an index on the export table on the
// update-key column(s).
// Without such an index, this export may perform like a real dog...
String[] updateKeyColumns =
OraOopUtilities.getExportUpdateKeyColumnNames(sqoopOptions);
if (!OraOopOracleQueries.doesIndexOnColumnsExist(connection,
oraOopConnManager.getOracleTableContext(), updateKeyColumns)) {
String msg = String.format(
"\n**************************************************************"
+ "***************************************************************"
+ "\n\tThe table %1$s does not have a valid index on "
+ "the column(s) %2$s.\n"
+ "\tAs a consequence, this export may take a long time to "
+ "complete.\n"
+ "\tIf performance is unacceptable, consider reattempting this "
+ "job after creating an index "
+ "on this table via the SQL...\n"
+ "\t\tcreate index <index_name> on %1$s(%2$s);\n"
+ "****************************************************************"
+ "*************************************************************",
oraOopConnManager.getOracleTableContext().toString(),
OraOopUtilities.stringArrayToCSV(updateKeyColumns));
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.WARN, msg));
}
}
}
/* ================================= */
/* CREATE A PARTITIONED TABLE */
/* ================================= */
if (OraOopUtilities
.userWantsToCreatePartitionedExportTableFromTemplate(conf)) {
// Create a new Oracle table using the specified template...
String[] subPartitionNames =
OraOopUtilities.generateExportTableSubPartitionNames(numMappers,
sysDateTime, conf);
// Create the export table from a template table...
String tableStorageClause =
OraOopUtilities.getExportTableStorageClause(conf);
OraOopOracleQueries.createExportTableFromTemplateWithPartitioning(
connection, oraOopConnManager.getOracleTableContext(),
tableStorageClause, templateTableContext, noLoggingOnNewTable,
partitionName, sysDateTime, sqoopOptions.getNumMappers(),
subPartitionNames);
return;
}
/* ===================================== */
/* CREATE A NON-PARTITIONED TABLE */
/* ===================================== */
if (OraOopUtilities
.userWantsToCreateNonPartitionedExportTableFromTemplate(conf)) {
String tableStorageClause =
OraOopUtilities.getExportTableStorageClause(conf);
OraOopOracleQueries.createExportTableFromTemplate(connection,
oraOopConnManager.getOracleTableContext(), tableStorageClause,
templateTableContext, noLoggingOnNewTable);
return;
}
/* ===================================================== */
/* ADD ADDITIONAL PARTITIONS TO AN EXISTING TABLE */
/* ===================================================== */
// If the export table is partitioned, and the partitions were created by
// OraOop, then we need
// create additional partitions...
OracleTablePartitions tablePartitions =
OraOopOracleQueries.getPartitions(connection, oraOopConnManager
.getOracleTableContext());
// Find any partition name starting with "ORAOOP_"...
OracleTablePartition oraOopPartition =
tablePartitions.findPartitionByRegEx("^"
+ OraOopConstants.EXPORT_TABLE_PARTITION_NAME_PREFIX);
if (tablePartitions.size() > 0 && oraOopPartition == null) {
for (int idx = 0; idx < tablePartitions.size(); idx++) {
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.INFO, String.format(
"The Oracle table %s has a partition named \"%s\".",
oraOopConnManager.getOracleTableContext().toString(),
tablePartitions.get(idx).getName())));
}
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.WARN, String.format(
"The Oracle table %s is partitioned.\n"
+ "These partitions were not created by %s.",
oraOopConnManager.getOracleTableContext().toString(),
OraOopConstants.ORAOOP_PRODUCT_NAME)));
}
if (oraOopPartition != null) {
// Indicate in the configuration what's happening...
conf.setBoolean(OraOopConstants.EXPORT_TABLE_HAS_ORAOOP_PARTITIONS, true);
messagesToDisplayAfterWelcome
.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.INFO,
String
.format(
"The Oracle table %s is partitioned.\n"
+ "These partitions were created by %s, so "
+ "additional partitions will now be created.\n"
+ "The name of the new partition will be \"%s\".",
oraOopConnManager.getOracleTableContext().toString(),
OraOopConstants.ORAOOP_PRODUCT_NAME, partitionName)));
String[] subPartitionNames =
OraOopUtilities.generateExportTableSubPartitionNames(numMappers,
sysDateTime, conf);
// Add another partition (and N subpartitions) to this existing,
// partitioned export table...
OraOopOracleQueries.createMoreExportTablePartitions(connection,
oraOopConnManager.getOracleTableContext(), partitionName,
sysDateTime, subPartitionNames);
return;
}
}
private void checkForOldOraOopTemporaryOracleTables(Connection connection,
Object sysDateTime, String schema,
List<OraOopLogMessage> messagesToDisplayAfterWelcome) {
try {
StringBuilder message = new StringBuilder();
message
.append(String.format(
"The following tables appear to be old temporary tables created by "
+ "%s that have not been deleted.\n"
+ "They are probably left over from jobs that encountered an error and "
+ "could not clean up after themselves.\n"
+ "You might want to drop these Oracle tables in order to reclaim "
+ "Oracle storage space:\n", OraOopConstants.ORAOOP_PRODUCT_NAME));
boolean showMessage = false;
String generatedTableName =
OraOopUtilities.generateExportTableMapperTableName(0, sysDateTime,
schema).getName();
generatedTableName = generatedTableName.replaceAll("[0-9]", "%");
generatedTableName =
OraOopUtilities.replaceAll(generatedTableName, "%%", "%");
Date sysDate = OraOopOracleQueries.oraDATEToDate(sysDateTime);
List<OracleTable> tables =
OraOopOracleQueries.getTablesWithTableNameLike(connection, schema,
generatedTableName);
for (OracleTable oracleTable : tables) {
OraOopUtilities.DecodedExportMapperTableName tableName =
OraOopUtilities.decodeExportTableMapperTableName(oracleTable);
if (tableName != null) {
Date tableDate =
OraOopOracleQueries.oraDATEToDate(tableName.getTableDateTime());
double daysApart =
(sysDate.getTime() - tableDate.getTime()) / (1000 * 60 * 60 * 24);
if (daysApart > 1.0) {
showMessage = true;
message.append(String.format("\t%s\n", oracleTable.toString()));
}
}
}
if (showMessage) {
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.INFO, message.toString()));
}
} catch (Exception ex) {
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.WARN, String.format(
"%s was unable to check for the existance of old "
+ "temporary Oracle tables.\n" + "Error:\n%s",
OraOopConstants.ORAOOP_PRODUCT_NAME, ex.toString())));
}
}
}