blob: c757466386826aa8ce0b4ae87623b255bc6ef2bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pipeline.transform.jdbcmetadata;
import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.sql.*;
import java.util.List;
import org.apache.hop.core.database.Database;
import org.apache.hop.core.database.DatabaseMeta;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.logging.LogLevel;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.TransformMeta;
public class JdbcMetadata extends BaseTransform<JdbcMetadataMeta, JdbcMetadataData> {
boolean treatAsInputTransform = false;
public JdbcMetadata(
TransformMeta transformMeta,
JdbcMetadataMeta meta,
JdbcMetadataData data,
int copyNr,
PipelineMeta pipelineMeta,
Pipeline pipeline) {
super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
}
private Object stringToArgumentValue(String stringValue, Class<?> type)
throws IllegalArgumentException {
Object argument;
if (type == String.class) {
argument = stringValue;
} else if (type == Boolean.class) {
argument = "Y".equals(stringValue) ? Boolean.TRUE : Boolean.FALSE;
} else if (type == Integer.class) {
argument = Integer.valueOf(stringValue);
} else {
throw new IllegalArgumentException("Can't handle valueType " + type.getName());
}
return argument;
}
private Object stringListToObjectArray(String stringValue, Class<?> type) {
if (stringValue == null) return null;
String[] stringValues = stringValue.split(",");
int n = stringValues.length;
Object result = Array.newInstance(type, n);
for (int i = 0; i < n; i++) {
Array.set(result, i, stringToArgumentValue(stringValues[i], type));
}
return result;
}
/**
* Utility to set up the method and its arguments
*
* @param meta
* @param data
* @throws Exception
*/
private void initMethod(JdbcMetadataMeta meta, JdbcMetadataData data) throws Exception {
// Set up the method to call
logDebug("Setting up method to call.");
Method method = meta.getMethod();
data.method = method;
// Try to set up the arguments for the method
logDebug("Setting up method arguments.");
Class<?>[] argumentTypes = method.getParameterTypes();
int argc = argumentTypes.length;
logDebug("Method has " + argc + " arguments.");
// If no arguments, treat it as an input transform and run it without any providing any input
// stream
treatAsInputTransform = argc == 0;
String[] arguments = new String[meta.getArguments().size()];
meta.getArguments().toArray(arguments);
logDebug("We expected " + arguments.length + " arguments' values.");
if (argc != arguments.length) {
throw new Exception(
"Method has a " + argc + " arguments, we expected " + arguments.length + " values.");
}
logDebug("Allocating arguments array.");
data.arguments = new Object[argc];
String stringArgument;
if (meta.isArgumentSourceFields()) {
// arguments are specified by values coming from fields.
// we don't know about the fields yet, so we
// initialize with bogus value so we can check if all fields were found
logDebug("Allocating field indices array for arguments.");
data.argumentFieldIndices = new int[argc];
for (int i = 0; i < argc; i++) {
data.argumentFieldIndices[i] = -1;
}
} else {
// arguments are specified directly by the user in the step.
// here we convert the string values into proper argument values
Class<?> argumentType;
Object argument;
for (int i = 0; i < argc; i++) {
argumentType = argumentTypes[i];
stringArgument = arguments[i];
if (stringArgument == null) {
argument = null;
} else {
stringArgument = variables.resolve(stringArgument);
if (argumentType.isArray()) {
if (stringArgument.length() == 0) {
argument = null;
} else {
argument = stringListToObjectArray(stringArgument, argumentType.getComponentType());
}
} else {
argument = stringToArgumentValue(stringArgument, argumentType);
}
}
data.arguments[i] = argument;
}
}
}
private void initOutputFields(JdbcMetadataMeta meta, JdbcMetadataData data) {
List<OutputField> outputFields = meta.getOutputFields();
int n = outputFields.size();
data.resultSetIndices = new int[n];
IValueMeta[] fields = meta.getMethodResultSetDescriptor();
int m = fields.length;
for (int i = 0; i < n; i++) {
String fieldName = outputFields.get(i).getName();
if (fieldName == null) {
continue;
}
for (int j = 0; j < m; j++) {
IValueMeta field = fields[j];
if (!fieldName.equals(field.getName())) continue;
data.resultSetIndices[i] = j + 1;
break;
}
}
}
@Override
public boolean init() {
boolean result = true;
if (!super.init()) return false;
try {
DatabaseMeta databaseMeta = getPipelineMeta().findDatabase(meta.getConnection(), variables);
if (databaseMeta == null) {
logError("Database connection is missing for transform " + getTransformName());
return false;
}
data.db = new Database(this, this, databaseMeta);
data.db.connect();
initMethod(meta, data);
initOutputFields(meta, data);
} catch (Exception exception) {
logError(
"Unexpected "
+ exception.getClass().getName()
+ " initializing step: "
+ exception.getMessage());
exception.printStackTrace();
result = false;
}
return result;
}
/**
* This is called in the processRow function to get the actual arguments for the jdbc metadata
* method
*
* @param meta
* @param data
* @param row
* @throws Exception
*/
private void prepareMethodArguments(JdbcMetadataMeta meta, JdbcMetadataData data, Object[] row)
throws Exception {
// if the arguments are not from fields, then we already took care of it in the init phase, so
// leave
if (!meta.isArgumentSourceFields()) return;
// if the arguments are from fields, then we already stored the right indices to take the values
// from
Object[] args = data.arguments;
int[] indices = data.argumentFieldIndices;
int index;
Class<?>[] argumentTypes = data.method.getParameterTypes();
Class<?> argumentType;
Object argument;
for (int i = 0; i < args.length; i++) {
index = indices[i];
if (index == -2) {
argument = null;
} else {
argument = row[index];
}
argumentType = argumentTypes[i];
if (argumentType.isArray() && argument != null) {
if ("".equals(argument)) {
logDebug("Converted empty string to null for argument array");
argument = null;
} else {
argument = stringListToObjectArray((String) argument, argumentType.getComponentType());
}
}
args[i] = argument;
}
}
private Object[] createOutputRow(
JdbcMetadataMeta meta, JdbcMetadataData data, Object[] inputRow) {
Object[] outputRow = new Object[data.outputRowMeta.size()];
if (!treatAsInputTransform) {
if (data.inputFieldsToCopy == null) {
System.arraycopy(inputRow, 0, outputRow, 0, getInputRowMeta().size());
} else {
for (int i = 0; i < data.inputFieldsToCopy.length; i++) {
outputRow[i] = inputRow[data.inputFieldsToCopy[i]];
}
}
}
return outputRow;
}
@Override
public boolean processRow() throws HopException {
// get incoming row, getRow() potentially blocks waiting for more rows, returns null if no more
// rows expected
Object[] r = getRow();
// if no more rows are expected, indicate step is finished and processRow() should not be called
// again
if (!treatAsInputTransform && r == null) {
setOutputDone();
return false;
}
// the "first" flag is inherited from the base step implementation
// it is used to guard some processing tasks, like figuring out field indexes
// in the row structure that only need to be done once
if (first) {
if (!treatAsInputTransform) {
first = false;
IRowMeta inputRowMeta = getInputRowMeta();
data.outputRowOffset = inputRowMeta.size();
boolean argumentSourceFields = meta.isArgumentSourceFields();
logDebug("Looking up indices of input fields.");
String[] fieldNames = inputRowMeta.getFieldNames();
logDebug("We have " + fieldNames.length + " input fields.");
List<String> arguments = meta.getArguments();
int argc = arguments.size();
// store indices for argument fields
if (argumentSourceFields) {
String stringArgument;
for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
logDebug("Looking at field: " + fieldName);
for (int j = 0; j < argc; j++) {
stringArgument = arguments.get(j);
logDebug("Found argument " + j + ": " + stringArgument);
if (fieldName.equals(stringArgument)) {
logDebug("Match, storing index " + i);
data.argumentFieldIndices[j] = i;
}
}
}
// keep track of how many fields we used as args.
// We need this in case remove argument fields is enabled
// as this is the number of fields we need to discard from the input row.
int fieldsUsedAsArgs = 0;
// ensure all argument fields are bound to a valid field
argumentFields:
for (int j = 0; j < argc; j++) {
logDebug("Argument indices at " + j + ": " + data.argumentFieldIndices[j]);
if (data.argumentFieldIndices[j] == -1) {
// this argument does not point to any existing field.
if (arguments.get(j) == null || arguments.get(j).length() == 0) {
// the argument is blank, this is ok: we will pass null instead
data.argumentFieldIndices[j] = -2;
} else {
// this argument is not blank - this means it points to a non-existing field.
// this is an error.
Object[] descriptor = meta.getMethodDescriptor();
Object[] args = (Object[]) descriptor[1];
Object[] arg = (Object[]) args[j];
throw new HopException(
"No field \""
+ arguments.get(j)
+ "\" found for argument "
+ j
+ ": "
+ ((String) arg[0]));
}
} else {
// this argument points to a valid field.
// let's check if this same field was already used as arg:
for (int i = 0; i < j; i++) {
if (data.argumentFieldIndices[i] == data.argumentFieldIndices[j]) {
// yes, it was used already. Let's check the next argument.
continue argumentFields;
}
}
// this field was not used already, so mark it as used.
fieldsUsedAsArgs++;
}
}
if (meta.isRemoveArgumentFields()) {
int n = data.outputRowOffset;
data.outputRowOffset -= fieldsUsedAsArgs;
data.inputFieldsToCopy = new int[data.outputRowOffset];
inputFieldsToCopy:
for (int i = 0, j = 0; i < n; i++) { // for each field in the input row
for (int k = 0; k < argc; k++) { // for each method argument
if (data.argumentFieldIndices[k] == i) {
// this input field is used as argument. Continue to the next field.
continue inputFieldsToCopy;
}
}
// this field was not used as argument. make sure we copy it to the output.
data.inputFieldsToCopy[j++] = i;
}
}
}
logDebug("Done looking up indices of input fields.");
// clone the input row structure and place it in our data object
data.outputRowMeta = getInputRowMeta().clone();
} else {
data.outputRowMeta = new RowMeta();
}
// use meta.getFields() to change it, so it reflects the output row structure
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
} // end of first
try {
logRowlevel("Processing 1 input row");
prepareMethodArguments(meta, data, r);
if (getLogLevel() == LogLevel.ROWLEVEL) {
logRowlevel("About to invoke method");
for (int i = 0; i < data.arguments.length; i++) {
logRowlevel(
"Argument "
+ i
+ "; "
+ (data.arguments[i] == null
? "null"
: data.arguments[i].toString()
+ "; "
+ data.arguments[i].getClass().getName()));
}
}
DatabaseMetaData databaseMetaData = data.db.getConnection().getMetaData();
ResultSet resultSet = (ResultSet) data.method.invoke(databaseMetaData, data.arguments);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int columnCount = resultSetMetaData.getColumnCount();
Object value;
boolean outputRows = false;
int k;
while (resultSet.next()) {
logRowlevel("Processing 1 output row.");
Object[] outputRow = createOutputRow(meta, data, r);
for (int i = data.outputRowOffset, j = 0; i < data.outputRowMeta.size(); i++, j++) {
k = data.resultSetIndices[j];
if (k > columnCount) continue;
int type = data.outputRowMeta.getValueMeta(i).getType();
value = getColumnValue(resultSet, resultSetMetaData, k, type);
outputRow[i] = value;
outputRows = true;
}
// put the row to the output row stream
putRow(data.outputRowMeta, outputRow);
logRowlevel("Done processing 1 output row.");
}
resultSet.close();
if (!outputRows && meta.isAlwaysPassInputRow()) {
Object[] outputRow = createOutputRow(meta, data, r);
putRow(data.outputRowMeta, outputRow);
}
logRowlevel("Done processing 1 input row.");
} catch (Exception exception) {
exception.printStackTrace();
if (exception instanceof HopException) {
throw (HopException) exception;
} else {
throw new HopException(exception);
}
}
// log progress if it is time to to so
if (checkFeedback(getLinesRead())) {
logBasic("Linenr " + getLinesRead()); // Some basic logging
}
if (treatAsInputTransform) {
setOutputDone();
return false;
}
// indicate that processRow() should be called again
return true;
}
private static Object getColumnValue(
ResultSet resultSet, ResultSetMetaData resultSetMetaData, int k, int type)
throws SQLException {
Object value;
switch (type) {
case IValueMeta
.TYPE_BOOLEAN: // while the JDBC spec prescribes boolean, not all drivers actually
// can deliver.
boolean v;
switch (resultSetMetaData.getColumnType(k)) {
case Types.INTEGER:
case Types.SMALLINT:
case Types.TINYINT:
v = resultSet.getInt(k) == 1;
break;
default:
v = resultSet.getBoolean(k);
}
value = Boolean.valueOf(v);
break;
case IValueMeta.TYPE_INTEGER:
value = Long.valueOf(resultSet.getInt(k));
break;
default:
value = resultSet.getObject(k);
}
return value;
}
@Override
public void dispose() {
// clean up the database
try {
if (data.database != null) {
data.database.disconnect();
data.connection = null;
data.database = null;
}
} catch (Exception ex) {
logError("Error cleaning up database: " + ex.getMessage());
}
// clean up the connection
try {
if (data.connection != null && !data.connection.isClosed()) {
data.connection.close();
}
} catch (Exception ex) {
logError("Error cleaning up connection: " + ex.getMessage());
}
data.connection = null;
data.arguments = null;
data.method = null;
data.argumentFieldIndices = null;
data.inputFieldsToCopy = null;
data.resultSetIndices = null;
super.dispose();
}
}