SQOOP-2609: Provide Apache Atlas integration for hive and hcatalog based imports.
(Balu Vellanki via Venkat Ranganathan)
diff --git a/conf/sqoop-site-template.xml b/conf/sqoop-site-template.xml
index 368af26..2182da3 100644
--- a/conf/sqoop-site-template.xml
+++ b/conf/sqoop-site-template.xml
@@ -167,4 +167,24 @@
</property>
-->
+ <!--
+ Configuration required to integrate Sqoop with Apache Atlas.
+ -->
+ <!--
+ <property>
+ <name>atlas.rest.address</name>
+ <value>http://localhost:21000/</value>
+ </property>
+ <property>
+ <name>atlas.cluster.name</name>
+ <value>primary</value>
+ </property>
+ <property>
+ <name>sqoop.job.data.publish.class</name>
+ <value>org.apache.atlas.sqoop.hook.SqoopHook</value>
+ <description>Atlas (or any other publisher) should implement this hook.
+ </description>
+ </property>
+ -->
+
</configuration>
diff --git a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
new file mode 100644
index 0000000..d77125f
--- /dev/null
+++ b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Publisher class for publising data to a consumer upon completion of Sqoop actions.
+ * Currently supports Hive import actions only.
+ */
+public class SqoopJobDataPublisher {
+
+ public static class Data {
+
+ public static final String JDBC_STORE = "JDBCStore";
+
+ String operation;
+ String user;
+ String storeType;
+ String storeTable;
+ String storeQuery;
+ String hiveDB;
+ String hiveTable;
+ Properties commandLineOpts;
+ long startTime;
+ long endTime;
+
+ String url;
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getStoreType() {
+ return storeType;
+ }
+
+ public String getStoreTable() {
+ return storeTable;
+ }
+
+ public String getStoreQuery() {
+ return storeQuery;
+ }
+
+ public String getHiveDB() {
+ return hiveDB;
+ }
+
+ public String getHiveTable() {
+ return hiveTable;
+ }
+
+ public Properties getOptions() {
+ return commandLineOpts;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public long getStartTime() { return startTime; }
+
+ public long getEndTime() { return endTime; }
+
+ private void init(String operation, String url, String user, String storeType, String storeTable,
+ String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts,
+ long startTime, long endTime) {
+ this.operation = operation;
+ this.url = url;
+ this.user = user;
+ this.storeType = storeType;
+ this.storeTable = storeTable;
+ this.storeQuery = storeQuery;
+ this.hiveDB = hiveDB;
+ if (this.hiveDB == null) {
+ this.hiveDB = SqoopHCatUtilities.DEFHCATDB;
+ }
+ this.hiveTable = hiveTable;
+ this.commandLineOpts = commandLineOpts;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ public Data(String operation, String url, String user, String storeType, String storeTable,
+ String storeQuery, String hiveDB, String hiveTable, Properties commandLineOpts,
+ long startTime, long endTime) {
+ init(operation, url, user, storeType, storeTable, storeQuery,
+ hiveDB, hiveTable, commandLineOpts, startTime, endTime);
+ }
+
+ public Data(SqoopOptions options, String tableName, long startTime, long endTime) throws IOException {
+ String hiveTableName = options.doHiveImport() ?
+ options.getHiveTableName() : options.getHCatTableName();
+ String hiveDatabase = options.doHiveImport() ?
+ options.getHiveDatabaseName() : options.getHCatDatabaseName();
+ String dataStoreType = JDBC_STORE;
+ String[] storeTypeFields = options.getConnectString().split(":");
+ if (storeTypeFields.length > 2) {
+ dataStoreType = storeTypeFields[1];
+ }
+
+ init("import", options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
+ dataStoreType, tableName, options.getSqlQuery(), hiveDatabase, hiveTableName,
+ options.writeProperties(), startTime, endTime);
+ }
+ }
+
+ public void publish(Data data) throws Exception{
+
+ }
+}
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index bd6e99b..7a19a62 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -105,6 +105,11 @@
*/
public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
+ /**
+ * The Configuration property identifying data publisher class.
+ */
+ public static final String DATA_PUBLISH_CLASS = "sqoop.job.data.publish.class";
+
private ConfigurationConstants() {
// Disable Explicit Object Creation
}
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 04d60fd..9b6e1a0 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -18,11 +18,16 @@
package org.apache.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.io.CodecMap;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ImportException;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.mapred.AvroJob;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,16 +42,16 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.SqoopJobDataPublisher;
+import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.PerfCounters;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.io.CodecMap;
-import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.mapreduce.JobBase;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.util.ImportException;
-import org.apache.sqoop.validation.*;
+import org.apache.sqoop.validation.ValidationContext;
+import org.apache.sqoop.validation.ValidationException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Date;
/**
* Base class for running an import MapReduce job.
@@ -55,7 +60,8 @@
public class ImportJobBase extends JobBase {
private ImportJobContext context;
-
+ private long startTime;
+ private long endTime;
public static final Log LOG = LogFactory.getLog(
ImportJobBase.class.getName());
@@ -82,6 +88,7 @@
final ImportJobContext context) {
super(opts, mapperClass, inputFormatClass, outputFormatClass);
this.context = context;
+ this.startTime = new Date().getTime();
}
/**
@@ -273,6 +280,28 @@
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
+ this.endTime = new Date().getTime();
+
+ String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
+ if (!StringUtils.isEmpty(publishClassName)) {
+ try {
+ Class publishClass = Class.forName(publishClassName);
+ Object obj = publishClass.newInstance();
+ if (obj instanceof SqoopJobDataPublisher) {
+ SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
+ if (options.doHiveImport() || options.getHCatTableName() != null) {
+ // We need to publish the details
+ SqoopJobDataPublisher.Data data =
+ new SqoopJobDataPublisher.Data(options, tableName, startTime, endTime);
+ publisher.publish(data);
+ }
+ } else {
+ LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
+ }
+ } catch (Exception ex) {
+ LOG.warn("Unable to publish data to publisher " + ex.getMessage(), ex);
+ }
+ }
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
diff --git a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
new file mode 100644
index 0000000..99bcae0
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import com.cloudera.sqoop.hive.HiveImport;
+import com.cloudera.sqoop.hive.TestHiveImport;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.tool.SqoopTool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class TestSqoopJobDataPublisher extends ImportJobTestCase {
+
+ public static class DummyDataPublisher extends SqoopJobDataPublisher {
+ private static String hiveTable;
+ private static String storeTable;
+ private static String storeType;
+
+ @Override
+ public void publish(SqoopJobDataPublisher.Data data) {
+ hiveTable = data.getHiveTable();
+ storeTable = data.getStoreTable();
+ storeType = data.getStoreType();
+ assert (data.getOperation().equals("import"));
+ }
+ }
+
+ public static final Log LOG = LogFactory.getLog(
+ TestHiveImport.class.getName());
+
+ public void setUp() {
+ super.setUp();
+ HiveImport.setTestMode(true);
+ }
+
+ public void tearDown() {
+ super.tearDown();
+ HiveImport.setTestMode(false);
+ }
+ /**
+ * Create the argv to pass to Sqoop.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(boolean includeHadoopFlags, String [] moreArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("-D");
+ args.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
+
+ if (null != moreArgs) {
+ for (String arg: moreArgs) {
+ args.add(arg);
+ }
+ }
+
+ args.add("--table");
+ args.add(getTableName());
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("--hive-import");
+ String [] colNames = getColNames();
+ if (null != colNames) {
+ args.add("--split-by");
+ args.add(colNames[0]);
+ } else {
+ fail("Could not determine column names.");
+ }
+
+ args.add("--num-mappers");
+ args.add("1");
+
+ for (String a : args) {
+ LOG.debug("ARG : "+ a);
+ }
+
+ return args.toArray(new String[0]);
+ }
+ private void runImportTest(String tableName, String [] types,
+ String [] values, String verificationScript, String [] args,
+ SqoopTool tool) throws IOException {
+
+ // create a table and populate it with a row...
+ createTableWithColTypes(types, values);
+
+ // set up our mock hive shell to compare our generated script
+ // against the correct expected one.
+ com.cloudera.sqoop.SqoopOptions options = getSqoopOptions(args, tool);
+ String hiveHome = options.getHiveHome();
+ assertNotNull("hive.home was not set", hiveHome);
+ String testDataPath = new Path(new Path(hiveHome),
+ "scripts/" + verificationScript).toString();
+ System.setProperty("expected.script",
+ new File(testDataPath).getAbsolutePath());
+
+ // verify that we can import it correctly into hive.
+ runImport(tool, args);
+ }
+
+ private com.cloudera.sqoop.SqoopOptions getSqoopOptions(String [] args, SqoopTool tool) {
+ com.cloudera.sqoop.SqoopOptions opts = null;
+ try {
+ opts = tool.parseArguments(args, null, null, true);
+ } catch (Exception e) {
+ fail("Invalid options: " + e.toString());
+ }
+
+ return opts;
+ }
+ protected void setNumCols(int numCols) {
+ String [] cols = new String[numCols];
+ for (int i = 0; i < numCols; i++) {
+ cols[i] = "DATA_COL" + i;
+ }
+
+ setColNames(cols);
+ }
+
+ /** Test that strings and ints are handled in the normal fashion. */
+ @Test
+ public void testNormalHiveImport() throws IOException {
+ final String TABLE_NAME = "NORMAL_HIVE_IMPORT";
+ setCurTableName(TABLE_NAME);
+ setNumCols(3);
+ String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+ String [] vals = { "'test'", "42", "'somestring'" };
+ runImportTest(TABLE_NAME, types, vals, "normalImport.q",
+ getArgv(false, null), new ImportTool());
+ assert (DummyDataPublisher.hiveTable.equals("NORMAL_HIVE_IMPORT"));
+ assert (DummyDataPublisher.storeTable.equals("NORMAL_HIVE_IMPORT"));
+ assert (DummyDataPublisher.storeType.equals("hsqldb"));
+
+ }
+
+}