SQOOP-2609: Provide Apache Atlas integration for hive and hcatalog based imports.
    (Balu Vellanki via Venkat Ranganathan)
diff --git a/conf/sqoop-site-template.xml b/conf/sqoop-site-template.xml
index 368af26..2182da3 100644
--- a/conf/sqoop-site-template.xml
+++ b/conf/sqoop-site-template.xml
@@ -167,4 +167,24 @@
   </property>
   -->
 
+  <!--
+    Configuration required to integrate Sqoop with Apache Atlas.
+  -->
+  <!--
+  <property>
+    <name>atlas.rest.address</name>
+    <value>http://localhost:21000/</value>
+  </property>
+  <property>
+    <name>atlas.cluster.name</name>
+    <value>primary</value>
+  </property>
+  <property>
+    <name>sqoop.job.data.publish.class</name>
+    <value>org.apache.atlas.sqoop.hook.SqoopHook</value>
+    <description>Atlas (or any other publisher) should implement this hook.
+    </description>
+  </property>
+  -->
+
 </configuration>
diff --git a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
new file mode 100644
index 0000000..d77125f
--- /dev/null
+++ b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Publisher class for publising data to a consumer upon completion of Sqoop actions.
+ * Currently supports Hive import actions only.
+ */
+public class SqoopJobDataPublisher {
+
+    public static class Data {
+
+        public static final String JDBC_STORE = "JDBCStore";
+
+        String operation;
+        String user;
+        String storeType;
+        String storeTable;
+        String storeQuery;
+        String hiveDB;
+        String hiveTable;
+        Properties commandLineOpts;
+        long startTime;
+        long endTime;
+
+        String url;
+
+        public String getOperation() {
+            return operation;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public String getStoreType() {
+            return storeType;
+        }
+
+        public String getStoreTable() {
+            return storeTable;
+        }
+
+        public String getStoreQuery() {
+            return storeQuery;
+        }
+
+        public String getHiveDB() {
+            return hiveDB;
+        }
+
+        public String getHiveTable() {
+            return hiveTable;
+        }
+
+        public Properties getOptions() {
+            return commandLineOpts;
+        }
+
+        public String getUrl() {
+            return url;
+        }
+
+        public long getStartTime() { return startTime; }
+
+        public long getEndTime() { return endTime; }
+
+        private void init(String operation, String url, String user, String storeType, String storeTable,
+                          String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts,
+                          long startTime, long endTime) {
+            this.operation = operation;
+            this.url = url;
+            this.user = user;
+            this.storeType = storeType;
+            this.storeTable = storeTable;
+            this.storeQuery = storeQuery;
+            this.hiveDB = hiveDB;
+            if (this.hiveDB == null) {
+                this.hiveDB =   SqoopHCatUtilities.DEFHCATDB;
+            }
+            this.hiveTable = hiveTable;
+            this.commandLineOpts = commandLineOpts;
+            this.startTime = startTime;
+            this.endTime = endTime;
+        }
+
+        public Data(String operation, String url, String user, String storeType, String storeTable,
+                              String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts,
+                              long startTime, long endTime) {
+            init(operation, url, user, storeType, storeTable, storeQuery,
+                    hiveDB, hiveTable, commandLineOpts, startTime, endTime);
+        }
+
+        public Data(SqoopOptions options, String tableName, long startTime, long endTime) throws IOException {
+            String hiveTableName = options.doHiveImport() ?
+                    options.getHiveTableName() : options.getHCatTableName();
+            String hiveDatabase = options.doHiveImport() ?
+                    options.getHiveDatabaseName() : options.getHCatDatabaseName();
+            String dataStoreType = JDBC_STORE;
+            String[] storeTypeFields = options.getConnectString().split(":");
+            if (storeTypeFields.length > 2) {
+                dataStoreType = storeTypeFields[1];
+            }
+
+            init("import", options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
+                    dataStoreType, tableName, options.getSqlQuery(), hiveDatabase, hiveTableName,
+                    options.writeProperties(), startTime, endTime);
+        }
+    }
+
+    public void publish(Data data) throws Exception{
+
+    }
+}
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index bd6e99b..7a19a62 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -105,6 +105,11 @@
    */
   public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
 
+  /**
+   * The Configuration property identifying data publisher class.
+   */
+  public static final String DATA_PUBLISH_CLASS = "sqoop.job.data.publish.class";
+
   private ConfigurationConstants() {
     // Disable Explicit Object Creation
   }
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 04d60fd..9b6e1a0 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -18,11 +18,16 @@
 
 package org.apache.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.io.CodecMap;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ImportException;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.mapred.AvroJob;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,16 +42,16 @@
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.SqoopJobDataPublisher;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.PerfCounters;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.io.CodecMap;
-import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.mapreduce.JobBase;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.util.ImportException;
-import org.apache.sqoop.validation.*;
+import org.apache.sqoop.validation.ValidationContext;
+import org.apache.sqoop.validation.ValidationException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Date;
 
 /**
  * Base class for running an import MapReduce job.
@@ -55,7 +60,8 @@
 public class ImportJobBase extends JobBase {
 
   private ImportJobContext context;
-
+  private long startTime;
+  private long endTime;
   public static final Log LOG = LogFactory.getLog(
       ImportJobBase.class.getName());
 
@@ -82,6 +88,7 @@
       final ImportJobContext context) {
     super(opts, mapperClass, inputFormatClass, outputFormatClass);
     this.context = context;
+    this.startTime = new Date().getTime();
   }
 
   /**
@@ -273,6 +280,28 @@
       if (options.isValidationEnabled()) {
         validateImport(tableName, conf, job);
       }
+      this.endTime = new Date().getTime();
+
+      String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
+      if (!StringUtils.isEmpty(publishClassName)) {
+        try {
+          Class publishClass =  Class.forName(publishClassName);
+          Object obj = publishClass.newInstance();
+          if (obj instanceof SqoopJobDataPublisher) {
+            SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
+            if (options.doHiveImport() || options.getHCatTableName() != null) {
+              // We need to publish the details
+              SqoopJobDataPublisher.Data data =
+                      new SqoopJobDataPublisher.Data(options, tableName, startTime, endTime);
+              publisher.publish(data);
+            }
+          } else {
+            LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
+          }
+        } catch (Exception ex) {
+          LOG.warn("Unable to publish data to publisher " + ex.getMessage(), ex);
+        }
+      }
     } catch (InterruptedException ie) {
       throw new IOException(ie);
     } catch (ClassNotFoundException cnfe) {
diff --git a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
new file mode 100644
index 0000000..99bcae0
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import com.cloudera.sqoop.hive.HiveImport;
+import com.cloudera.sqoop.hive.TestHiveImport;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.tool.SqoopTool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class TestSqoopJobDataPublisher extends ImportJobTestCase {
+
+    public static class DummyDataPublisher extends SqoopJobDataPublisher {
+        private static String hiveTable;
+        private static String storeTable;
+        private static String storeType;
+
+        @Override
+        public void publish(SqoopJobDataPublisher.Data data) {
+            hiveTable = data.getHiveTable();
+            storeTable = data.getStoreTable();
+            storeType = data.getStoreType();
+            assert (data.getOperation().equals("import"));
+        }
+    }
+
+    public static final Log LOG = LogFactory.getLog(
+            TestHiveImport.class.getName());
+
+    public void setUp() {
+        super.setUp();
+        HiveImport.setTestMode(true);
+    }
+
+    public void tearDown() {
+        super.tearDown();
+        HiveImport.setTestMode(false);
+    }
+    /**
+     * Create the argv to pass to Sqoop.
+     * @return the argv as an array of strings.
+     */
+    protected String [] getArgv(boolean includeHadoopFlags, String [] moreArgs) {
+        ArrayList<String> args = new ArrayList<String>();
+
+        if (includeHadoopFlags) {
+            CommonArgs.addHadoopFlags(args);
+        }
+
+        args.add("-D");
+        args.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
+
+        if (null != moreArgs) {
+            for (String arg: moreArgs) {
+                args.add(arg);
+            }
+        }
+
+        args.add("--table");
+        args.add(getTableName());
+        args.add("--warehouse-dir");
+        args.add(getWarehouseDir());
+        args.add("--connect");
+        args.add(getConnectString());
+        args.add("--hive-import");
+        String [] colNames = getColNames();
+        if (null != colNames) {
+            args.add("--split-by");
+            args.add(colNames[0]);
+        } else {
+            fail("Could not determine column names.");
+        }
+
+        args.add("--num-mappers");
+        args.add("1");
+
+        for (String a : args) {
+            LOG.debug("ARG : "+ a);
+        }
+
+        return args.toArray(new String[0]);
+    }
+    private void runImportTest(String tableName, String [] types,
+                               String [] values, String verificationScript, String [] args,
+                               SqoopTool tool) throws IOException {
+
+        // create a table and populate it with a row...
+        createTableWithColTypes(types, values);
+
+        // set up our mock hive shell to compare our generated script
+        // against the correct expected one.
+        com.cloudera.sqoop.SqoopOptions options = getSqoopOptions(args, tool);
+        String hiveHome = options.getHiveHome();
+        assertNotNull("hive.home was not set", hiveHome);
+        String testDataPath = new Path(new Path(hiveHome),
+                "scripts/" + verificationScript).toString();
+        System.setProperty("expected.script",
+                new File(testDataPath).getAbsolutePath());
+
+        // verify that we can import it correctly into hive.
+        runImport(tool, args);
+    }
+
+    private com.cloudera.sqoop.SqoopOptions getSqoopOptions(String [] args, SqoopTool tool) {
+        com.cloudera.sqoop.SqoopOptions opts = null;
+        try {
+            opts = tool.parseArguments(args, null, null, true);
+        } catch (Exception e) {
+            fail("Invalid options: " + e.toString());
+        }
+
+        return opts;
+    }
+    protected void setNumCols(int numCols) {
+        String [] cols = new String[numCols];
+        for (int i = 0; i < numCols; i++) {
+            cols[i] = "DATA_COL" + i;
+        }
+
+        setColNames(cols);
+    }
+
+    /** Test that strings and ints are handled in the normal fashion. */
+    @Test
+    public void testNormalHiveImport() throws IOException {
+        final String TABLE_NAME = "NORMAL_HIVE_IMPORT";
+        setCurTableName(TABLE_NAME);
+        setNumCols(3);
+        String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+        String [] vals = { "'test'", "42", "'somestring'" };
+        runImportTest(TABLE_NAME, types, vals, "normalImport.q",
+                getArgv(false, null), new ImportTool());
+        assert (DummyDataPublisher.hiveTable.equals("NORMAL_HIVE_IMPORT"));
+        assert (DummyDataPublisher.storeTable.equals("NORMAL_HIVE_IMPORT"));
+        assert (DummyDataPublisher.storeType.equals("hsqldb"));
+
+    }
+
+}