blob: 6715508e89b16dae9361a72dfc970ce83131b25d [file] [log] [blame]
package org.apache.hawq.pxf.plugins.jdbc;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Plugin;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* JDBC tables plugin (base class)
*
* Implemented subclasses: {@link JdbcAccessor}, {@link JdbcResolver}.
*/
public class JdbcPlugin extends Plugin {
/**
* Class constructor
*
* @param input {@link InputData} provided by PXF
*
* @throws UserDataException if one of the required request parameters is not set
*/
public JdbcPlugin(InputData input) throws UserDataException {
super(input);
jdbcDriver = input.getUserProperty("JDBC_DRIVER");
if (jdbcDriver == null) {
throw new UserDataException("JDBC_DRIVER is a required parameter");
}
dbUrl = input.getUserProperty("DB_URL");
if (dbUrl == null) {
throw new UserDataException("DB_URL is a required parameter");
}
tableName = input.getDataSource();
if (tableName == null) {
throw new UserDataException("Data source must be provided");
}
/*
At the moment, when writing into some table, the table name is
concatenated with a special string that is necessary to write into HDFS.
However, a raw table name is necessary in case of JDBC.
The correct table name is extracted here.
*/
Matcher matcher = tableNamePattern.matcher(tableName);
if (matcher.matches()) {
inputData.setDataSource(matcher.group(1));
tableName = input.getDataSource();
}
columns = inputData.getTupleDescription();
if (columns == null) {
throw new UserDataException("Tuple description must be provided");
}
// This parameter is not required. The default value is null
user = input.getUserProperty("USER");
if (user != null) {
pass = input.getUserProperty("PASS");
}
// This parameter is not required. The default value is 0
String batchSizeRaw = input.getUserProperty("BATCH_SIZE");
if (batchSizeRaw != null) {
try {
batchSize = Integer.parseInt(batchSizeRaw);
if (batchSize < 1) {
throw new NumberFormatException();
} else if (batchSize == 0) {
batchSize = 1;
}
batchSizeIsSetByUser = true;
}
catch (NumberFormatException e) {
throw new UserDataException("BATCH_SIZE is incorrect: must be a non-negative integer");
}
}
// This parameter is not required. The default value is 1
String poolSizeRaw = input.getUserProperty("POOL_SIZE");
if (poolSizeRaw != null) {
try {
poolSize = Integer.parseInt(poolSizeRaw);
}
catch (NumberFormatException e) {
throw new UserDataException("POOL_SIZE is incorrect: must be an integer");
}
}
}
/**
* Open a new JDBC connection
*
* @throws ClassNotFoundException if the JDBC driver was not found
* @throws SQLException if a database access error occurs
* @throws SQLTimeoutException if a connection problem occurs
*/
public Connection getConnection() throws ClassNotFoundException, SQLException, SQLTimeoutException {
Connection connection;
if (LOG.isDebugEnabled()) {
if (user != null) {
LOG.debug(String.format("Open JDBC connection: driver=%s, url=%s, user=%s, pass=%s, table=%s",
jdbcDriver, dbUrl, user, pass, tableName));
}
else {
LOG.debug(String.format("Open JDBC connection: driver=%s, url=%s, table=%s",
jdbcDriver, dbUrl, tableName));
}
}
Class.forName(jdbcDriver);
if (user != null) {
connection = DriverManager.getConnection(dbUrl, user, pass);
}
else {
connection = DriverManager.getConnection(dbUrl);
}
return connection;
}
/**
* Close a JDBC connection
*/
public static void closeConnection(Connection connection) {
try {
if ((connection != null) && (!connection.isClosed())) {
if ((connection.getMetaData().supportsTransactions()) && (!connection.getAutoCommit())) {
connection.commit();
}
connection.close();
}
}
catch (SQLException e) {
LOG.error("JDBC connection close error", e);
}
}
/**
* Prepare a JDBC PreparedStatement
*
* @throws ClassNotFoundException if the JDBC driver was not found
* @throws SQLException if a database access error occurs
* @throws SQLTimeoutException if a connection problem occurs
*/
public PreparedStatement getPreparedStatement(Connection connection, String query) throws SQLException, SQLTimeoutException, ClassNotFoundException {
if ((connection == null) || (query == null)) {
throw new IllegalArgumentException("The provided query or connection is null");
}
if (connection.getMetaData().supportsTransactions()) {
connection.setAutoCommit(false);
}
return connection.prepareStatement(query);
}
/**
* Close a JDBC Statement (and the connection it is based on)
*/
public static void closeStatement(Statement statement) {
if (statement == null) {
return;
}
Connection connection = null;
try {
if (!statement.isClosed()) {
connection = statement.getConnection();
statement.close();
}
}
catch (Exception e) {}
closeConnection(connection);
}
// JDBC parameters
protected String jdbcDriver = null;
protected String dbUrl = null;
protected String user = null;
protected String pass = null;
protected String tableName = null;
// '100' is a recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
public static final int DEFAULT_BATCH_SIZE = 100;
// After argument parsing, this value is guaranteed to be >= 1
protected int batchSize = DEFAULT_BATCH_SIZE;
protected boolean batchSizeIsSetByUser = false;
protected int poolSize = 1;
// Columns description
protected ArrayList<ColumnDescriptor> columns = null;
private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
// At the moment, when writing into some table, the table name is concatenated with a special string that is necessary to write into HDFS. However, a raw table name is necessary in case of JDBC. This Pattern allows to extract the correct table name from the given InputData.dataSource
private static final Pattern tableNamePattern = Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
}