/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pipeline.transform.jdbcmetadata;

import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.sql.*;
import java.util.List;
import org.apache.hop.core.database.Database;
import org.apache.hop.core.database.DatabaseMeta;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.logging.LogLevel;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.TransformMeta;

public class JdbcMetadata extends BaseTransform<JdbcMetadataMeta, JdbcMetadataData> {

  boolean treatAsInputTransform = false;

  public JdbcMetadata(
      TransformMeta transformMeta,
      JdbcMetadataMeta meta,
      JdbcMetadataData data,
      int copyNr,
      PipelineMeta pipelineMeta,
      Pipeline pipeline) {
    super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
  }

  private Object stringToArgumentValue(String stringValue, Class<?> type)
      throws IllegalArgumentException {
    Object argument;
    if (type == String.class) {
      argument = stringValue;
    } else if (type == Boolean.class) {
      argument = "Y".equals(stringValue) ? Boolean.TRUE : Boolean.FALSE;
    } else if (type == Integer.class) {
      argument = Integer.valueOf(stringValue);
    } else {
      throw new IllegalArgumentException("Can't handle valueType " + type.getName());
    }
    return argument;
  }

  private Object stringListToObjectArray(String stringValue, Class<?> type) {
    if (stringValue == null) return null;
    String[] stringValues = stringValue.split(",");
    int n = stringValues.length;
    Object result = Array.newInstance(type, n);
    for (int i = 0; i < n; i++) {
      Array.set(result, i, stringToArgumentValue(stringValues[i], type));
    }
    return result;
  }

  /**
   * Utility to set up the method and its arguments
   *
   * @param meta
   * @param data
   * @throws Exception
   */
  private void initMethod(JdbcMetadataMeta meta, JdbcMetadataData data) throws Exception {
    // Set up the method to call
    logDebug("Setting up method to call.");
    Method method = meta.getMethod();
    data.method = method;

    // Try to set up the arguments for the method
    logDebug("Setting up method arguments.");
    Class<?>[] argumentTypes = method.getParameterTypes();
    int argc = argumentTypes.length;
    logDebug("Method has " + argc + " arguments.");
    // If no arguments, treat it as an input transform and run it without any providing any input
    // stream
    treatAsInputTransform = argc == 0;
    String[] arguments = new String[meta.getArguments().size()];
    meta.getArguments().toArray(arguments);
    logDebug("We expected " + arguments.length + " arguments' values.");
    if (argc != arguments.length) {
      throw new Exception(
          "Method has a " + argc + " arguments, we expected " + arguments.length + " values.");
    }
    logDebug("Allocating arguments array.");
    data.arguments = new Object[argc];
    String stringArgument;
    if (meta.isArgumentSourceFields()) {
      // arguments are specified by values coming from fields.
      // we don't know about the fields yet, so we
      // initialize with bogus value so we can check if all fields were found
      logDebug("Allocating field indices array for arguments.");
      data.argumentFieldIndices = new int[argc];
      for (int i = 0; i < argc; i++) {
        data.argumentFieldIndices[i] = -1;
      }
    } else {
      // arguments are specified directly by the user in the step.
      // here we convert the string values into proper argument values
      Class<?> argumentType;
      Object argument;
      for (int i = 0; i < argc; i++) {
        argumentType = argumentTypes[i];
        stringArgument = arguments[i];
        if (stringArgument == null) {
          argument = null;
        } else {
          stringArgument = variables.resolve(stringArgument);
          if (argumentType.isArray()) {
            if (stringArgument.length() == 0) {
              argument = null;
            } else {
              argument = stringListToObjectArray(stringArgument, argumentType.getComponentType());
            }
          } else {
            argument = stringToArgumentValue(stringArgument, argumentType);
          }
        }
        data.arguments[i] = argument;
      }
    }
  }

  private void initOutputFields(JdbcMetadataMeta meta, JdbcMetadataData data) {
    List<OutputField> outputFields = meta.getOutputFields();
    int n = outputFields.size();
    data.resultSetIndices = new int[n];

    IValueMeta[] fields = meta.getMethodResultSetDescriptor();
    int m = fields.length;

    for (int i = 0; i < n; i++) {
      String fieldName = outputFields.get(i).getName();
      if (fieldName == null) {
        continue;
      }
      for (int j = 0; j < m; j++) {
        IValueMeta field = fields[j];
        if (!fieldName.equals(field.getName())) continue;
        data.resultSetIndices[i] = j + 1;
        break;
      }
    }
  }

  @Override
  public boolean init() {
    boolean result = true;
    if (!super.init()) return false;

    try {
      DatabaseMeta databaseMeta = getPipelineMeta().findDatabase(meta.getConnection(), variables);
      if (databaseMeta == null) {
        logError("Database connection is missing for transform " + getTransformName());
        return false;
      }
      data.db = new Database(this, this, databaseMeta);
      data.db.connect();
      initMethod(meta, data);
      initOutputFields(meta, data);
    } catch (Exception exception) {
      logError(
          "Unexpected "
              + exception.getClass().getName()
              + " initializing step: "
              + exception.getMessage());
      exception.printStackTrace();
      result = false;
    }
    return result;
  }

  /**
   * This is called in the processRow function to get the actual arguments for the jdbc metadata
   * method
   *
   * @param meta
   * @param data
   * @param row
   * @throws Exception
   */
  private void prepareMethodArguments(JdbcMetadataMeta meta, JdbcMetadataData data, Object[] row)
      throws Exception {
    // if the arguments are not from fields, then we already took care of it in the init phase, so
    // leave
    if (!meta.isArgumentSourceFields()) return;
    // if the arguments are from fields, then we already stored the right indices to take the values
    // from
    Object[] args = data.arguments;
    int[] indices = data.argumentFieldIndices;
    int index;
    Class<?>[] argumentTypes = data.method.getParameterTypes();
    Class<?> argumentType;
    Object argument;
    for (int i = 0; i < args.length; i++) {
      index = indices[i];
      if (index == -2) {
        argument = null;
      } else {
        argument = row[index];
      }
      argumentType = argumentTypes[i];
      if (argumentType.isArray() && argument != null) {
        if ("".equals(argument)) {
          logDebug("Converted empty string to null for argument array");
          argument = null;
        } else {
          argument = stringListToObjectArray((String) argument, argumentType.getComponentType());
        }
      }
      args[i] = argument;
    }
  }

  private Object[] createOutputRow(
      JdbcMetadataMeta meta, JdbcMetadataData data, Object[] inputRow) {
    Object[] outputRow = new Object[data.outputRowMeta.size()];
    if (!treatAsInputTransform) {
      if (data.inputFieldsToCopy == null) {
        System.arraycopy(inputRow, 0, outputRow, 0, getInputRowMeta().size());
      } else {
        for (int i = 0; i < data.inputFieldsToCopy.length; i++) {
          outputRow[i] = inputRow[data.inputFieldsToCopy[i]];
        }
      }
    }
    return outputRow;
  }

  @Override
  public boolean processRow() throws HopException {

    // get incoming row, getRow() potentially blocks waiting for more rows, returns null if no more
    // rows expected
    Object[] r = getRow();

    // if no more rows are expected, indicate step is finished and processRow() should not be called
    // again
    if (!treatAsInputTransform && r == null) {
      setOutputDone();
      return false;
    }

    // the "first" flag is inherited from the base step implementation
    // it is used to guard some processing tasks, like figuring out field indexes
    // in the row structure that only need to be done once
    if (first) {
      if (!treatAsInputTransform) {
        first = false;
        IRowMeta inputRowMeta = getInputRowMeta();
        data.outputRowOffset = inputRowMeta.size();
        boolean argumentSourceFields = meta.isArgumentSourceFields();
        logDebug("Looking up indices of input fields.");
        String[] fieldNames = inputRowMeta.getFieldNames();
        logDebug("We have " + fieldNames.length + " input fields.");
        List<String> arguments = meta.getArguments();
        int argc = arguments.size();
        // store indices for argument fields
        if (argumentSourceFields) {
          String stringArgument;
          for (int i = 0; i < fieldNames.length; i++) {
            String fieldName = fieldNames[i];
            logDebug("Looking at field: " + fieldName);
            for (int j = 0; j < argc; j++) {
              stringArgument = arguments.get(j);
              logDebug("Found argument " + j + ": " + stringArgument);
              if (fieldName.equals(stringArgument)) {
                logDebug("Match, storing index " + i);
                data.argumentFieldIndices[j] = i;
              }
            }
          }
          // keep track of how many fields we used as args.
          // We need this in case remove argument fields is enabled
          // as this is the number of fields we need to discard from the input row.
          int fieldsUsedAsArgs = 0;
          // ensure all argument fields are bound to a valid field
          argumentFields:
          for (int j = 0; j < argc; j++) {
            logDebug("Argument indices at " + j + ": " + data.argumentFieldIndices[j]);
            if (data.argumentFieldIndices[j] == -1) {
              // this argument does not point to any existing field.
              if (arguments.get(j) == null || arguments.get(j).length() == 0) {
                // the argument is blank, this is ok: we will pass null instead
                data.argumentFieldIndices[j] = -2;
              } else {
                // this argument is not blank - this means it points to a non-existing field.
                // this is an error.
                Object[] descriptor = meta.getMethodDescriptor();
                Object[] args = (Object[]) descriptor[1];
                Object[] arg = (Object[]) args[j];
                throw new HopException(
                    "No field \""
                        + arguments.get(j)
                        + "\" found for argument "
                        + j
                        + ": "
                        + ((String) arg[0]));
              }
            } else {
              // this argument points to a valid field.
              // let's check if this same field was already used as arg:
              for (int i = 0; i < j; i++) {
                if (data.argumentFieldIndices[i] == data.argumentFieldIndices[j]) {
                  // yes, it was used already. Let's check the next argument.
                  continue argumentFields;
                }
              }
              // this field was not used already, so mark it as used.
              fieldsUsedAsArgs++;
            }
          }

          if (meta.isRemoveArgumentFields()) {
            int n = data.outputRowOffset;
            data.outputRowOffset -= fieldsUsedAsArgs;
            data.inputFieldsToCopy = new int[data.outputRowOffset];

            inputFieldsToCopy:
            for (int i = 0, j = 0; i < n; i++) { // for each field in the input row
              for (int k = 0; k < argc; k++) { // for each method argument
                if (data.argumentFieldIndices[k] == i) {
                  // this input field is used as argument. Continue to the next field.
                  continue inputFieldsToCopy;
                }
              }
              // this field was not used as argument. make sure we copy it to the output.
              data.inputFieldsToCopy[j++] = i;
            }
          }
        }
        logDebug("Done looking up indices of input fields.");
        // clone the input row structure and place it in our data object
        data.outputRowMeta = getInputRowMeta().clone();
      } else {
        data.outputRowMeta = new RowMeta();
      }
      // use meta.getFields() to change it, so it reflects the output row structure
      meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
    } // end of first

    try {
      logRowlevel("Processing 1 input row");
      prepareMethodArguments(meta, data, r);
      if (getLogLevel() == LogLevel.ROWLEVEL) {
        logRowlevel("About to invoke method");
        for (int i = 0; i < data.arguments.length; i++) {
          logRowlevel(
              "Argument "
                  + i
                  + "; "
                  + (data.arguments[i] == null
                      ? "null"
                      : data.arguments[i].toString()
                          + "; "
                          + data.arguments[i].getClass().getName()));
        }
      }

      DatabaseMetaData databaseMetaData = data.db.getConnection().getMetaData();
      ResultSet resultSet = (ResultSet) data.method.invoke(databaseMetaData, data.arguments);
      ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
      int columnCount = resultSetMetaData.getColumnCount();
      Object value;
      boolean outputRows = false;
      int k;
      while (resultSet.next()) {
        logRowlevel("Processing 1 output row.");
        Object[] outputRow = createOutputRow(meta, data, r);
        for (int i = data.outputRowOffset, j = 0; i < data.outputRowMeta.size(); i++, j++) {
          k = data.resultSetIndices[j];
          if (k > columnCount) continue;
          int type = data.outputRowMeta.getValueMeta(i).getType();
          value = getColumnValue(resultSet, resultSetMetaData, k, type);
          outputRow[i] = value;
          outputRows = true;
        }
        // put the row to the output row stream
        putRow(data.outputRowMeta, outputRow);
        logRowlevel("Done processing 1 output row.");
      }
      resultSet.close();
      if (!outputRows && meta.isAlwaysPassInputRow()) {
        Object[] outputRow = createOutputRow(meta, data, r);
        putRow(data.outputRowMeta, outputRow);
      }
      logRowlevel("Done processing 1 input row.");
    } catch (Exception exception) {
      exception.printStackTrace();
      if (exception instanceof HopException) {
        throw (HopException) exception;
      } else {
        throw new HopException(exception);
      }
    }

    // log progress if it is time to to so
    if (checkFeedback(getLinesRead())) {
      logBasic("Linenr " + getLinesRead()); // Some basic logging
    }

    if (treatAsInputTransform) {
      setOutputDone();
      return false;
    }

    // indicate that processRow() should be called again
    return true;
  }

  private static Object getColumnValue(
      ResultSet resultSet, ResultSetMetaData resultSetMetaData, int k, int type)
      throws SQLException {
    Object value;
    switch (type) {
      case IValueMeta
          .TYPE_BOOLEAN: // while the JDBC spec prescribes boolean, not all drivers actually
        // can deliver.
        boolean v;
        switch (resultSetMetaData.getColumnType(k)) {
          case Types.INTEGER:
          case Types.SMALLINT:
          case Types.TINYINT:
            v = resultSet.getInt(k) == 1;
            break;
          default:
            v = resultSet.getBoolean(k);
        }
        value = Boolean.valueOf(v);
        break;
      case IValueMeta.TYPE_INTEGER:
        value = Long.valueOf(resultSet.getInt(k));
        break;
      default:
        value = resultSet.getObject(k);
    }
    return value;
  }

  @Override
  public void dispose() {
    // clean up the database
    try {
      if (data.database != null) {
        data.database.disconnect();
        data.connection = null;
        data.database = null;
      }
    } catch (Exception ex) {
      logError("Error cleaning up database: " + ex.getMessage());
    }

    // clean up the connection
    try {
      if (data.connection != null && !data.connection.isClosed()) {
        data.connection.close();
      }
    } catch (Exception ex) {
      logError("Error cleaning up connection: " + ex.getMessage());
    }
    data.connection = null;
    data.arguments = null;
    data.method = null;
    data.argumentFieldIndices = null;
    data.inputFieldsToCopy = null;
    data.resultSetIndices = null;

    super.dispose();
  }
}
