/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.sqoop.mapreduce;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.util.AsyncSink;
import org.apache.sqoop.util.JdbcUrl;
import org.apache.sqoop.util.LoggingAsyncSink;
import org.apache.sqoop.util.NullAsyncSink;
import org.apache.sqoop.util.TaskId;
import org.apache.sqoop.io.NamedFifo;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.manager.MySQLUtils;

/**
 * Mapper that starts a 'mysqlimport' process and uses that to export rows from
 * HDFS to a MySQL database at high speed.
 *
 * map() methods are actually provided by subclasses that read from
 * SequenceFiles (containing existing SqoopRecords) or text files
 * (containing delimited lines) and deliver these results to the fifo
 * used to interface with mysqlimport.
 */
public class MySQLExportMapper<KEYIN, VALIN>
    extends SqoopMapper<KEYIN, VALIN, NullWritable, NullWritable> {

  public static final Log LOG = LogFactory.getLog(
      MySQLExportMapper.class.getName());

  /** Configuration key that specifies the number of bytes before which it
   * commits the current export transaction and opens a new one.
   * Default is 32 MB; setting this to 0 will use no checkpoints.
   */
  public static final String MYSQL_CHECKPOINT_BYTES_KEY =
      "sqoop.mysql.export.checkpoint.bytes";

  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;

  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
  protected long checkpointDistInBytes;

  /** Configuration key that specifies the number of milliseconds
   * to sleep at the end of each checkpoint commit
   * Default is 0, no sleep.
   */
  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
      "sqoop.mysql.export.sleep.ms";

  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;

  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
  protected long checkpointSleepMs;

  protected Configuration conf;

  /** The FIFO being used to communicate with mysqlimport. */
  protected File fifoFile;

  /** The process object representing the active connection to mysqlimport. */
  protected Process mysqlImportProcess;

  /** The stream to write to stdin for mysqlimport. */
  protected OutputStream importStream;

  // Handlers for stdout and stderr from mysqlimport.
  protected AsyncSink outSink;
  protected AsyncSink errSink;

  /** File object where we wrote the user's password to pass to mysqlimport. */
  protected File passwordFile;

  /** Character set used to write to mysqlimport. */
  protected String mysqlCharSet;

  /**
   * Tally of bytes written to current mysqlimport instance.
   * We commit an interim tx and open a new mysqlimport after this
   * gets too big. */
  private long bytesWritten;

  /**
   * Create a named FIFO, and start mysqlimport connected to that FIFO.
   * A File object representing the FIFO is in 'fifoFile'.
   */
  private void initMySQLImportProcess() throws IOException {
    File taskAttemptDir = TaskId.getLocalWorkPath(conf);

    this.fifoFile = new File(taskAttemptDir,
        conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
    String filename = fifoFile.toString();

    // Create the FIFO itself.
    try {
      new NamedFifo(this.fifoFile).create();
    } catch (IOException ioe) {
      // Command failed.
      LOG.error("Could not mknod " + filename);
      this.fifoFile = null;
      throw new IOException(
          "Could not create FIFO to interface with mysqlimport", ioe);
    }

    // Now open the connection to mysqlimport.
    ArrayList<String> args = new ArrayList<String>();

    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
    String databaseName = JdbcUrl.getDatabaseName(connectString);
    String hostname = JdbcUrl.getHostName(connectString);
    int port = JdbcUrl.getPort(connectString);

    if (null == databaseName) {
      throw new IOException("Could not determine database name");
    }

    args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
    String password = DBConfiguration.getPassword((JobConf) conf);

    if (null != password && password.length() > 0) {
      passwordFile = new File(MySQLUtils.writePasswordFile(conf));
      args.add("--defaults-file=" + passwordFile);
    }

    String username = conf.get(MySQLUtils.USERNAME_KEY);
    if (null != username) {
      args.add("--user=" + username);
    }

    args.add("--host=" + hostname);
    if (-1 != port) {
      args.add("--port=" + Integer.toString(port));
    }

    args.add("--compress");
    args.add("--local");
    args.add("--silent");

    // Specify the subset of columns we're importing.
    DBConfiguration dbConf = new DBConfiguration(conf);
    String [] cols = dbConf.getInputFieldNames();
    if (null != cols) {
      StringBuilder sb = new StringBuilder();
      boolean first = true;
      for (String col : cols) {
        if (!first) {
          sb.append(",");
        }
        sb.append(col);
        first = false;
      }

      args.add("--columns=" + sb.toString());
    }

    // Specify the delimiters to use.
    int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
        (int) ',');
    int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
        (int) '\n');
    int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
    int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
    boolean encloseRequired = conf.getBoolean(
        MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

    args.add("--fields-terminated-by=0x"
        + Integer.toString(outputFieldDelim, 16));
    args.add("--lines-terminated-by=0x"
        + Integer.toString(outputRecordDelim, 16));
    if (0 != enclosedBy) {
      if (encloseRequired) {
        args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
      } else {
        args.add("--fields-optionally-enclosed-by=0x"
            + Integer.toString(enclosedBy, 16));
      }
    }

    if (0 != escapedBy) {
      args.add("--fields-escaped-by=0x" + Integer.toString(escapedBy, 16));
    }

    // These two arguments are positional and must be last.
    args.add(databaseName);
    args.add(filename);

    // Begin the export in an external process.
    LOG.debug("Starting mysqlimport with arguments:");
    for (String arg : args) {
      LOG.debug("  " + arg);
    }

    // Actually start mysqlimport.
    mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));

    // Log everything it writes to stderr.
    // Ignore anything on stdout.
    this.outSink = new NullAsyncSink();
    this.outSink.processStream(mysqlImportProcess.getInputStream());

    this.errSink = new LoggingAsyncSink(LOG);
    this.errSink.processStream(mysqlImportProcess.getErrorStream());

    // Open the named FIFO after starting mysqlimport.
    this.importStream = new BufferedOutputStream(
        new FileOutputStream(fifoFile));

    // At this point, mysqlimport is running and hooked up to our FIFO.
    // The mapper just needs to populate it with data.

    this.bytesWritten = 0;
  }

  @Override
  public void run(Context context) throws IOException, InterruptedException {
    this.conf = context.getConfiguration();
    setup(context);
    initMySQLImportProcess();
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
      cleanup(context);
    } finally {
      // Shut down the mysqlimport process.
      closeExportHandles();
    }
  }

  private void closeExportHandles() throws IOException, InterruptedException {
    int ret = 0;
    if (null != this.importStream) {
      // Close the stream that writes to mysqlimport's stdin first.
      LOG.debug("Closing import stream");
      this.importStream.close();
      this.importStream = null;
    }

    if (null != this.mysqlImportProcess) {
      // We started mysqlimport; wait for it to finish.
      LOG.info("Waiting for mysqlimport to complete");
      ret = this.mysqlImportProcess.waitFor();
      LOG.info("mysqlimport closed connection");
      this.mysqlImportProcess = null;
    }

    if (null != this.passwordFile && this.passwordFile.exists()) {
      if (!this.passwordFile.delete()) {
        LOG.error("Could not remove mysql password file " + passwordFile);
        LOG.error("You should remove this file to protect your credentials.");
      }

      this.passwordFile = null;
    }

    // Finish processing any output from mysqlimport.
    // This is informational only, so we don't care about return codes.
    if (null != outSink) {
      LOG.debug("Waiting for any additional stdout from mysqlimport");
      outSink.join();
      outSink = null;
    }

    if (null != errSink) {
      LOG.debug("Waiting for any additional stderr from mysqlimport");
      errSink.join();
      errSink = null;
    }

    if (this.fifoFile != null && this.fifoFile.exists()) {
      // Clean up the resources we created.
      LOG.debug("Removing fifo file");
      if (!this.fifoFile.delete()) {
        LOG.error("Could not clean up named FIFO after completing mapper");
      }

      // We put the FIFO file in a one-off subdir. Remove that.
      File fifoParentDir = this.fifoFile.getParentFile();
      LOG.debug("Removing task attempt tmpdir");
      if (!fifoParentDir.delete()) {
        LOG.error("Could not clean up task dir after completing mapper");
      }

      this.fifoFile = null;
    }

    if (0 != ret) {
      // Don't mark the task as successful if mysqlimport returns an error.
      throw new IOException("mysqlimport terminated with error code " + ret);
    }
  }

  @Override
  protected void setup(Context context) {
    this.conf = context.getConfiguration();

    // TODO: Support additional encodings.
    this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;

    this.checkpointDistInBytes = conf.getLong(
        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
    if (this.checkpointDistInBytes < 0) {
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
    }

    this.checkpointSleepMs = conf.getLong(
        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);

    if (this.checkpointSleepMs < 0) {
      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
    }

    if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) {
      LOG.warn("Value for "
          + MYSQL_CHECKPOINT_SLEEP_KEY
          + " has to be smaller than mapred.task.timeout");
      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
    }
  }

  /**
   * Takes a delimited text record (e.g., the output of a 'Text' object),
   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
   * @param record A delimited text representation of one record.
   * @param terminator an optional string that contains delimiters that
   *   terminate the record (if not included in 'record' itself).
   */
  protected void writeRecord(String record, String terminator)
      throws IOException, InterruptedException {

    // We've already set up mysqlimport to accept the same delimiters,
    // so we don't need to convert those. But our input text is UTF8
    // encoded; mysql allows configurable encoding, but defaults to
    // latin-1 (ISO8859_1). We'll convert to latin-1 for now.
    // TODO: Support user-configurable encodings.

    byte [] mysqlBytes = record.getBytes(this.mysqlCharSet);
    this.importStream.write(mysqlBytes, 0, mysqlBytes.length);
    this.bytesWritten += mysqlBytes.length;

    if (null != terminator) {
      byte [] termBytes = terminator.getBytes(this.mysqlCharSet);
      this.importStream.write(termBytes, 0, termBytes.length);
      this.bytesWritten += termBytes.length;
    }

    // If bytesWritten is too big, then we should start a new tx by closing
    // mysqlimport and opening a new instance of the process.
    if (this.checkpointDistInBytes != 0
        && this.bytesWritten > this.checkpointDistInBytes) {
      LOG.info("Checkpointing current export.");

      if (this.checkpointSleepMs != 0) {
        LOG.info("Pausing.");
        Thread.sleep(this.checkpointSleepMs);
      }

      closeExportHandles();
      initMySQLImportProcess();
      this.bytesWritten = 0;
    }
  }
}

