BIGTOP-3490. Bump Sqoop to 1.4.7. (#726)

diff --git a/bigtop-packages/src/common/sqoop/do-component-build b/bigtop-packages/src/common/sqoop/do-component-build
index 77be764..4818213 100755
--- a/bigtop-packages/src/common/sqoop/do-component-build
+++ b/bigtop-packages/src/common/sqoop/do-component-build
@@ -17,9 +17,9 @@
 set -ex
 . `dirname ${0}`/bigtop.bom
 
-# Use of StringUtils.repeat in src/test/org/apache/sqoop/TestExportUsingProcedure
-# is incompatible with Apache Commons Lang 2.4
-sed -i -e 's/sql.append(StringUtils.repeat("?", ",  ",/sql.append(StringUtils.repeat("?",/' src/test/org/apache/sqoop/TestExportUsingProcedure.java
-
-ant -f build.xml -Dhadoopversion=210 -Dhadoop.version.full=$HADOOP_VERSION -Dhadoop.version=$HADOOP_VERSION -Dhbase.version=$HBASE_VERSION -Dzookeeper.version=$ZOOKEEPER_VERSION package jar "$@"
-
+ant -f build.xml \
+  -Dhadoop.version=$HADOOP_VERSION \
+  -Dhbase.version=$HBASE_VERSION \
+  -Dhcatalog.version=$HIVE_VERSION \
+  -Dzookeeper.version=$ZOOKEEPER_VERSION \
+  package jar "$@"
diff --git a/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff b/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff
new file mode 100644
index 0000000..0cbf6c3
--- /dev/null
+++ b/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff
@@ -0,0 +1,615 @@
+diff --git a/ivy.xml b/ivy.xml
+index e4b45bfd..263bcc81 100644
+--- a/ivy.xml
++++ b/ivy.xml
+@@ -162,6 +162,15 @@ under the License.
+       <exclude org="org.apache.thrift" module="thrift"/>
+       <exclude org="log4j" module="log4j"/>
+     </dependency>
++    <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase.version}" conf="common->default">
++      <artifact name="hbase-zookeeper" type="jar"/>
++      <artifact name="hbase-zookeeper" type="jar" ext="jar" m:classifier="tests"/>
++      <exclude org="com.sun.jersey" module="jersey-core"/>
++      <exclude org="com.sun.jersey" module="jersey-json"/>
++      <exclude org="com.sun.jersey" module="jersey-server"/>
++      <exclude org="org.apache.thrift" module="thrift"/>
++      <exclude org="log4j" module="log4j"/>
++    </dependency>
+     <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase.version}" conf="common->default">
+       <artifact name="hbase-hadoop-compat" type="jar"/>
+       <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+index 032fd38a..cf97b8a6 100644
+--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
++++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+@@ -25,8 +25,11 @@
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.BufferedMutator;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.client.Delete;
+-import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.client.Mutation;
+ import org.apache.hadoop.hbase.client.Put;
+ import org.apache.hadoop.hbase.util.Bytes;
+@@ -84,12 +87,19 @@
+   // into a Put command.
+   private PutTransformer putTransformer;
+ 
+-  private String tableName;
+-  private HTable table;
++  private Connection hbaseConnection;
++  private BufferedMutator bufferedMutator;
+ 
+   public HBasePutProcessor() {
+   }
+ 
++  HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection, BufferedMutator bufferedMutator) {
++    this.conf = conf;
++    this.putTransformer = putTransformer;
++    this.hbaseConnection = hbaseConnection;
++    this.bufferedMutator = bufferedMutator;
++  }
++
+   @Override
+   @SuppressWarnings("unchecked")
+   public void setConf(Configuration config) {
+@@ -106,15 +116,24 @@ public void setConf(Configuration config) {
+       throw new RuntimeException("Could not instantiate PutTransformer.");
+     }
+     putTransformer.init(conf);
++    initHBaseMutator();
++  }
+ 
+-    this.tableName = conf.get(TABLE_NAME_KEY, null);
++  private void initHBaseMutator() {
++    String tableName = conf.get(TABLE_NAME_KEY, null);
+     try {
+-      this.table = new HTable(conf, this.tableName);
+-    } catch (IOException ioe) {
+-      throw new RuntimeException("Could not access HBase table " + tableName,
+-          ioe);
++      hbaseConnection = ConnectionFactory.createConnection(conf);
++      bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName));
++    } catch (IOException e) {
++      if (hbaseConnection != null) {
++        try {
++          hbaseConnection.close();
++        } catch (IOException connCloseException){
++          LOG.error("Cannot close HBase connection.", connCloseException);
++        }
++      }
++      throw new RuntimeException("Could not create mutator for HBase table " + tableName, e);
+     }
+-    this.table.setAutoFlush(false);
+   }
+ 
+   @Override
+@@ -131,38 +150,54 @@ public void accept(FieldMappable record)
+       throws IOException, ProcessingException {
+     Map<String, Object> fields = record.getFieldMap();
+     List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+-    if (null != mutationList) {
+-      for (Mutation mutation : mutationList) {
+-        if (mutation!=null) {
+-            if(mutation instanceof Put) {
+-              Put putObject = (Put) mutation;
+-              if (putObject.isEmpty()) {
+-                LOG.warn("Could not insert row with no columns "
+-                      + "for row-key column: " + Bytes.toString(putObject.getRow()));
+-                } else {
+-                  this.table.put(putObject);
+-                }
+-              } else if(mutation instanceof Delete) {
+-                Delete deleteObject = (Delete) mutation;
+-                if (deleteObject.isEmpty()) {
+-                  LOG.warn("Could not delete row with no columns "
+-                        + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
+-                } else {
+-                  this.table.delete(deleteObject);
+-                }
+-            }
+-        }
++    if (mutationList == null) {
++      return;
++    }
++    for (Mutation mutation : mutationList) {
++      if (!canAccept(mutation)) {
++        continue;
++      }
++      if (!mutation.isEmpty()) {
++        bufferedMutator.mutate(mutation);
++      } else {
++        logEmptyMutation(mutation);
+       }
+     }
+   }
+ 
++  private void logEmptyMutation(Mutation mutation) {
++    String action = null;
++    if (mutation instanceof Put) {
++      action = "insert";
++    } else if (mutation instanceof Delete) {
++      action = "delete";
++    }
++    LOG.warn("Could not " + action + " row with no columns "
++        + "for row-key column: " + Bytes.toString(mutation.getRow()));
++  }
++
++  private boolean canAccept(Mutation mutation) {
++    return mutation != null && (mutation instanceof Put || mutation instanceof Delete);
++  }
++
+   @Override
+   /**
+    * Closes the HBase table and commits all pending operations.
+    */
+   public void close() throws IOException {
+-    this.table.flushCommits();
+-    this.table.close();
++    try {
++      bufferedMutator.flush();
++    } finally {
++      try {
++        bufferedMutator.close();
++      } finally {
++        try {
++          hbaseConnection.close();
++        } catch (IOException e) {
++          LOG.error("Cannot close HBase connection.", e);
++        }
++      }
++    }
+   }
+ 
+ }
+diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+index 2bbfffe0..ed89aeb1 100644
+--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
++++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+@@ -26,10 +26,14 @@
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Admin;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
++import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.mapreduce.Job;
+@@ -50,6 +54,8 @@
+   public static final Log LOG = LogFactory.getLog(
+       HBaseBulkImportJob.class.getName());
+ 
++  private Connection hbaseConnection;
++
+   public HBaseBulkImportJob(final SqoopOptions opts,
+       final ImportJobContext importContext) {
+     super(opts, importContext);
+@@ -81,8 +87,21 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+ 
+     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
+     FileOutputFormat.setOutputPath(job, getContext().getDestination());
+-    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+-    HFileOutputFormat.configureIncrementalLoad(job, hTable);
++    TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
++    hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration());
++
++    try (
++        Table hbaseTable = hbaseConnection.getTable(hbaseTableName)
++    ) {
++      HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
++    } catch (IOException | RuntimeException e) {
++      try {
++        hbaseConnection.close();
++      } catch (IOException ioException) {
++        LOG.error("Cannot close HBase connection.", ioException);
++      }
++      throw e;
++    }
+   }
+ 
+   /**
+@@ -99,15 +118,16 @@ protected void completeImport(Job job) throws IOException, ImportException {
+     setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
+       FsPermission.createImmutable((short) 00777));
+ 
+-    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
++    TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
+ 
+     // Load generated HFiles into table
+-    try {
+-      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+-        job.getConfiguration());
+-      loader.doBulkLoad(bulkLoadDir, hTable);
+-    }
+-    catch (Exception e) {
++    try (
++        Table hbaseTable = hbaseConnection.getTable(hbaseTableName);
++        Admin hbaseAdmin = hbaseConnection.getAdmin()
++    ) {
++      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration());
++      loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
++    } catch (Exception e) {
+       String errorMessage = String.format("Unrecoverable error while " +
+         "performing the bulk load of files in [%s]",
+         bulkLoadDir.toString());
+@@ -117,11 +137,19 @@ protected void completeImport(Job job) throws IOException, ImportException {
+ 
+   @Override
+   protected void jobTeardown(Job job) throws IOException, ImportException {
+-    super.jobTeardown(job);
+-    // Delete the hfiles directory after we are finished.
+-    Path destination = getContext().getDestination();
+-    FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+-    fileSystem.delete(destination, true);
++    try {
++	    super.jobTeardown(job);
++      // Delete the hfiles directory after we are finished.
++      Path destination = getContext().getDestination();
++      FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
++      fileSystem.delete(destination, true);
++    } finally {
++      try {
++        hbaseConnection.close();
++      } catch (IOException e) {
++        LOG.error("Cannot close HBase connection.", e);
++      }
++    }
+   }
+ 
+   /**
+diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+index 523d0a7e..5adb7883 100644
+--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
++++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+@@ -19,7 +19,6 @@
+ package org.apache.sqoop.mapreduce;
+ 
+ import java.io.IOException;
+-import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+ 
+ import org.apache.commons.logging.Log;
+@@ -28,10 +27,14 @@
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+-import org.apache.hadoop.hbase.client.HBaseAdmin;
+-import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Admin;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.hbase.security.User;
++import org.apache.hadoop.hbase.security.token.TokenUtil;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.mapreduce.Job;
+@@ -160,65 +163,28 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+       HBaseConfiguration.addHbaseResources(conf);
+     }
+ 
+-    HBaseAdmin admin = new HBaseAdmin(conf);
++    Connection hbaseConnection = ConnectionFactory.createConnection(conf);
++    Admin admin = hbaseConnection.getAdmin();
+ 
+     if (!skipDelegationTokens(conf)) {
+-      // Add authentication token to the job if we're running on secure cluster.
+-      //
+-      // We're currently supporting HBase version 0.90 that do not have security
+-      // patches which means that it do not have required methods
+-      // "isSecurityEnabled" and "obtainAuthTokenForJob".
+-      //
+-      // We're using reflection API to see if those methods are available and call
+-      // them only if they are present.
+-      //
+-      // After we will remove support for HBase 0.90 we can simplify the code to
+-      // following code fragment:
+-      /*
+       try {
+-        if (User.isSecurityEnabled()) {
+-          User user = User.getCurrent();
+-          user.obtainAuthTokenForJob(conf, job);
++        if (User.isHBaseSecurityEnabled(conf)) {
++          TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job);
+         }
+       } catch(InterruptedException ex) {
+         throw new ImportException("Can't get authentication token", ex);
+       }
+-      */
+-      try {
+-        // Get method isSecurityEnabled
+-        Method isHBaseSecurityEnabled = User.class.getMethod(
+-            "isHBaseSecurityEnabled", Configuration.class);
+-
+-        // Get method obtainAuthTokenForJob
+-        Method obtainAuthTokenForJob = User.class.getMethod(
+-            "obtainAuthTokenForJob", Configuration.class, Job.class);
+-
+-        // Get current user
+-        User user = User.getCurrent();
+-
+-        // Obtain security token if needed
+-        if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) {
+-          obtainAuthTokenForJob.invoke(user, conf, job);
+-        }
+-      } catch (NoSuchMethodException e) {
+-        LOG.info("It seems that we're running on HBase without security"
+-            + " additions. Security additions will not be used during this job.");
+-      } catch (InvocationTargetException e) {
+-        throw new ImportException("Can't get authentication token", e);
+-      } catch (IllegalAccessException e) {
+-        throw new ImportException("Can't get authentication token", e);
+-      }
+     }
+ 
+     // Check to see if the table exists.
+     HTableDescriptor tableDesc = null;
+     byte [] familyBytes = Bytes.toBytes(familyName);
+     HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
+-    if (!admin.tableExists(tableName)) {
++    if (!admin.tableExists(TableName.valueOf(tableName))) {
+       if (options.getCreateHBaseTable()) {
+         // Create the table.
+         LOG.info("Creating missing HBase table " + tableName);
+-        tableDesc =  new HTableDescriptor(tableName);
++        tableDesc =  new HTableDescriptor(TableName.valueOf(tableName));
+         tableDesc.addFamily(colDesc);
+         admin.createTable(tableDesc);
+       } else {
+@@ -228,16 +194,16 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+       }
+     } else {
+       // Table exists, so retrieve their current version
+-      tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName));
++	    tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
+ 
+       // Check if current version do have specified column family
+       if (!tableDesc.hasFamily(familyBytes)) {
+         if (options.getCreateHBaseTable()) {
+           // Create the column family.
+           LOG.info("Creating missing column family " + familyName);
+-          admin.disableTable(tableName);
+-          admin.addColumn(tableName, colDesc);
+-          admin.enableTable(tableName);
++          admin.disableTable(TableName.valueOf(tableName));
++          admin.addColumn(TableName.valueOf(tableName), colDesc);
++          admin.enableTable(TableName.valueOf(tableName));
+         } else {
+           LOG.warn("Could not find column family " + familyName + " in table "
+             + tableName);
+@@ -250,10 +216,11 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+     // Make sure we close the connection to HBA, this is only relevant in
+     // unit tests
+     admin.close();
++    hbaseConnection.close();
+ 
+     // Make sure HBase libraries are shipped as part of the job.
+     TableMapReduceUtil.addDependencyJars(job);
+-    TableMapReduceUtil.addDependencyJars(conf, HTable.class);
++    TableMapReduceUtil.addDependencyJars(conf, Table.class);
+ 
+     super.jobSetup(job);
+   }
+diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+index d9f74952..9d365ebb 100644
+--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
++++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+@@ -38,6 +38,10 @@
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.MiniHBaseCluster;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.client.Get;
+ import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.client.Result;
+@@ -236,9 +240,10 @@ protected void verifyHBaseCell(String tableName, String rowKey,
+       String colFamily, String colName, String val) throws IOException {
+     Get get = new Get(Bytes.toBytes(rowKey));
+     get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
+-    HTable table = new HTable(new Configuration(
+-        hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
+-    try {
++    try (
++        Connection hbaseConnection = createHBaseConnection();
++        Table table = getHBaseTable(hbaseConnection, tableName)
++    ) {
+       Result r = table.get(get);
+       byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
+           Bytes.toBytes(colName));
+@@ -248,8 +253,6 @@ protected void verifyHBaseCell(String tableName, String rowKey,
+         assertNotNull("No result, but we expected one", actualVal);
+         assertEquals(val, Bytes.toString(actualVal));
+       }
+-    } finally {
+-      table.close();
+     }
+   }
+   public static File createTempDir() {
+@@ -264,18 +267,25 @@ public static File createTempDir() {
+   protected int countHBaseTable(String tableName, String colFamily)
+       throws IOException {
+     int count = 0;
+-    HTable table = new HTable(new Configuration(
+-        hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
+-    try {
++    try (
++        Connection hbaseConnection = createHBaseConnection();
++        Table table = getHBaseTable(hbaseConnection, tableName)
++    ) {
+       ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
+       for(Result result = scanner.next();
+           result != null;
+           result = scanner.next()) {
+         count++;
+       }
+-    } finally {
+-      table.close();
+     }
+     return count;
+   }
++
++  private Connection createHBaseConnection() throws IOException {
++    return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration()));
++  }
++
++  private Table getHBaseTable(Connection connection, String tableName) throws IOException {
++    return connection.getTable(TableName.valueOf(tableName));
++  }
+ }
+diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
+new file mode 100644
+index 00000000..73b31772
+--- /dev/null
++++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
+@@ -0,0 +1,133 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.sqoop.hbase;
++
++import com.cloudera.sqoop.lib.FieldMappable;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hbase.client.BufferedMutator;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.Delete;
++import org.apache.hadoop.hbase.client.Mutation;
++import org.apache.hadoop.hbase.client.Put;
++import org.apache.sqoop.util.ExpectedLogMessage;
++import org.junit.Before;
++import org.junit.Rule;
++import org.junit.Test;
++
++import java.util.Arrays;
++import java.util.List;
++
++import static java.util.Collections.singletonList;
++import static org.mockito.Matchers.anyMap;
++import static org.mockito.Mockito.mock;
++import static org.mockito.Mockito.verify;
++import static org.mockito.Mockito.verifyNoMoreInteractions;
++import static org.mockito.Mockito.when;
++
++public class TestHBasePutProcessor {
++
++  @Rule
++  public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage();
++
++  private Configuration configuration;
++  private Connection hbaseConnection;
++  private PutTransformer putTransformer;
++  private BufferedMutator bufferedMutator;
++  private FieldMappable fieldMappable;
++
++  private HBasePutProcessor hBasePutProcessor;
++
++  @Before
++  public void before() {
++    configuration = mock(Configuration.class);
++    hbaseConnection = mock(Connection.class);
++    putTransformer = mock(PutTransformer.class);
++    bufferedMutator = mock(BufferedMutator.class);
++    fieldMappable = mock(FieldMappable.class);
++
++    hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection, bufferedMutator);
++  }
++
++  @Test
++  public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception {
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(null);
++    verifyNoMoreInteractions(bufferedMutator);
++
++    hBasePutProcessor.accept(fieldMappable);
++  }
++
++  @Test
++  public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception {
++    List<Mutation> inputList = Arrays.asList(null, null, null);
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++    verifyNoMoreInteractions(bufferedMutator);
++
++    hBasePutProcessor.accept(fieldMappable);
++  }
++
++  @Test
++  public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception {
++    List<Mutation> inputList = singletonList(mock(Mutation.class));
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++    verifyNoMoreInteractions(bufferedMutator);
++
++    hBasePutProcessor.accept(fieldMappable);
++  }
++
++  @Test
++  public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception {
++    Mutation emptyPutMutation = mock(Put.class);
++    when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes());
++    when(emptyPutMutation.isEmpty()).thenReturn(true);
++    List<Mutation> inputList = singletonList(emptyPutMutation);
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++    verifyNoMoreInteractions(bufferedMutator);
++    expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column: emptyPutMutation");
++
++    hBasePutProcessor.accept(fieldMappable);
++  }
++
++  @Test
++  public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception {
++    Mutation emptyDeleteMutation = mock(Delete.class);
++    when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes());
++    when(emptyDeleteMutation.isEmpty()).thenReturn(true);
++    List<Mutation> inputList = singletonList(emptyDeleteMutation);
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++    verifyNoMoreInteractions(bufferedMutator);
++    expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column: emptyDeleteMutation");
++
++    hBasePutProcessor.accept(fieldMappable);
++  }
++
++  @Test
++  public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived() throws Exception {
++    Mutation aPutMutation = mock(Put.class);
++    Mutation aDeleteMutation = mock(Delete.class);
++    Mutation anotherPutMutation = mock(Put.class);
++    List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation);
++    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++
++    hBasePutProcessor.accept(fieldMappable);
++
++    verify(bufferedMutator).mutate(aPutMutation);
++    verify(bufferedMutator).mutate(aDeleteMutation);
++    verify(bufferedMutator).mutate(anotherPutMutation);
++  }
++
++}
+\ No newline at end of file
diff --git a/bigtop.bom b/bigtop.bom
index cb12bc0..5b2a761 100644
--- a/bigtop.bom
+++ b/bigtop.bom
@@ -200,7 +200,7 @@
     'sqoop' {
       name    = 'sqoop'
       relNotes = 'Apache Sqoop v1'
-      version { base = '1.4.6'; pkg = base; release = 1 }
+      version { base = '1.4.7'; pkg = base; release = 1 }
       tarball { destination = "${name}-${version.base}.tar.gz"
                 source      = destination }
       url     { download_path = "/$name/${version.base}/"