BIGTOP-3490. Bump Sqoop to 1.4.7. (#726)
diff --git a/bigtop-packages/src/common/sqoop/do-component-build b/bigtop-packages/src/common/sqoop/do-component-build
index 77be764..4818213 100755
--- a/bigtop-packages/src/common/sqoop/do-component-build
+++ b/bigtop-packages/src/common/sqoop/do-component-build
@@ -17,9 +17,9 @@
set -ex
. `dirname ${0}`/bigtop.bom
-# Use of StringUtils.repeat in src/test/org/apache/sqoop/TestExportUsingProcedure
-# is incompatible with Apache Commons Lang 2.4
-sed -i -e 's/sql.append(StringUtils.repeat("?", ", ",/sql.append(StringUtils.repeat("?",/' src/test/org/apache/sqoop/TestExportUsingProcedure.java
-
-ant -f build.xml -Dhadoopversion=210 -Dhadoop.version.full=$HADOOP_VERSION -Dhadoop.version=$HADOOP_VERSION -Dhbase.version=$HBASE_VERSION -Dzookeeper.version=$ZOOKEEPER_VERSION package jar "$@"
-
+ant -f build.xml \
+ -Dhadoop.version=$HADOOP_VERSION \
+ -Dhbase.version=$HBASE_VERSION \
+ -Dhcatalog.version=$HIVE_VERSION \
+ -Dzookeeper.version=$ZOOKEEPER_VERSION \
+ package jar "$@"
diff --git a/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff b/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff
new file mode 100644
index 0000000..0cbf6c3
--- /dev/null
+++ b/bigtop-packages/src/common/sqoop/patch1-SQOOP-3232.diff
@@ -0,0 +1,615 @@
+diff --git a/ivy.xml b/ivy.xml
+index e4b45bfd..263bcc81 100644
+--- a/ivy.xml
++++ b/ivy.xml
+@@ -162,6 +162,15 @@ under the License.
+ <exclude org="org.apache.thrift" module="thrift"/>
+ <exclude org="log4j" module="log4j"/>
+ </dependency>
++ <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase.version}" conf="common->default">
++ <artifact name="hbase-zookeeper" type="jar"/>
++ <artifact name="hbase-zookeeper" type="jar" ext="jar" m:classifier="tests"/>
++ <exclude org="com.sun.jersey" module="jersey-core"/>
++ <exclude org="com.sun.jersey" module="jersey-json"/>
++ <exclude org="com.sun.jersey" module="jersey-server"/>
++ <exclude org="org.apache.thrift" module="thrift"/>
++ <exclude org="log4j" module="log4j"/>
++ </dependency>
+ <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase.version}" conf="common->default">
+ <artifact name="hbase-hadoop-compat" type="jar"/>
+ <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+index 032fd38a..cf97b8a6 100644
+--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
++++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+@@ -25,8 +25,11 @@
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.BufferedMutator;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.client.Delete;
+-import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.client.Mutation;
+ import org.apache.hadoop.hbase.client.Put;
+ import org.apache.hadoop.hbase.util.Bytes;
+@@ -84,12 +87,19 @@
+ // into a Put command.
+ private PutTransformer putTransformer;
+
+- private String tableName;
+- private HTable table;
++ private Connection hbaseConnection;
++ private BufferedMutator bufferedMutator;
+
+ public HBasePutProcessor() {
+ }
+
++ HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection, BufferedMutator bufferedMutator) {
++ this.conf = conf;
++ this.putTransformer = putTransformer;
++ this.hbaseConnection = hbaseConnection;
++ this.bufferedMutator = bufferedMutator;
++ }
++
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setConf(Configuration config) {
+@@ -106,15 +116,24 @@ public void setConf(Configuration config) {
+ throw new RuntimeException("Could not instantiate PutTransformer.");
+ }
+ putTransformer.init(conf);
++ initHBaseMutator();
++ }
+
+- this.tableName = conf.get(TABLE_NAME_KEY, null);
++ private void initHBaseMutator() {
++ String tableName = conf.get(TABLE_NAME_KEY, null);
+ try {
+- this.table = new HTable(conf, this.tableName);
+- } catch (IOException ioe) {
+- throw new RuntimeException("Could not access HBase table " + tableName,
+- ioe);
++ hbaseConnection = ConnectionFactory.createConnection(conf);
++ bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName));
++ } catch (IOException e) {
++ if (hbaseConnection != null) {
++ try {
++ hbaseConnection.close();
++ } catch (IOException connCloseException){
++ LOG.error("Cannot close HBase connection.", connCloseException);
++ }
++ }
++ throw new RuntimeException("Could not create mutator for HBase table " + tableName, e);
+ }
+- this.table.setAutoFlush(false);
+ }
+
+ @Override
+@@ -131,38 +150,54 @@ public void accept(FieldMappable record)
+ throws IOException, ProcessingException {
+ Map<String, Object> fields = record.getFieldMap();
+ List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+- if (null != mutationList) {
+- for (Mutation mutation : mutationList) {
+- if (mutation!=null) {
+- if(mutation instanceof Put) {
+- Put putObject = (Put) mutation;
+- if (putObject.isEmpty()) {
+- LOG.warn("Could not insert row with no columns "
+- + "for row-key column: " + Bytes.toString(putObject.getRow()));
+- } else {
+- this.table.put(putObject);
+- }
+- } else if(mutation instanceof Delete) {
+- Delete deleteObject = (Delete) mutation;
+- if (deleteObject.isEmpty()) {
+- LOG.warn("Could not delete row with no columns "
+- + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
+- } else {
+- this.table.delete(deleteObject);
+- }
+- }
+- }
++ if (mutationList == null) {
++ return;
++ }
++ for (Mutation mutation : mutationList) {
++ if (!canAccept(mutation)) {
++ continue;
++ }
++ if (!mutation.isEmpty()) {
++ bufferedMutator.mutate(mutation);
++ } else {
++ logEmptyMutation(mutation);
+ }
+ }
+ }
+
++ private void logEmptyMutation(Mutation mutation) {
++ String action = null;
++ if (mutation instanceof Put) {
++ action = "insert";
++ } else if (mutation instanceof Delete) {
++ action = "delete";
++ }
++ LOG.warn("Could not " + action + " row with no columns "
++ + "for row-key column: " + Bytes.toString(mutation.getRow()));
++ }
++
++ private boolean canAccept(Mutation mutation) {
++ return mutation != null && (mutation instanceof Put || mutation instanceof Delete);
++ }
++
+ @Override
+ /**
+ * Closes the HBase table and commits all pending operations.
+ */
+ public void close() throws IOException {
+- this.table.flushCommits();
+- this.table.close();
++ try {
++ bufferedMutator.flush();
++ } finally {
++ try {
++ bufferedMutator.close();
++ } finally {
++ try {
++ hbaseConnection.close();
++ } catch (IOException e) {
++ LOG.error("Cannot close HBase connection.", e);
++ }
++ }
++ }
+ }
+
+ }
+diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+index 2bbfffe0..ed89aeb1 100644
+--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
++++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+@@ -26,10 +26,14 @@
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Admin;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
++import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.mapreduce.Job;
+@@ -50,6 +54,8 @@
+ public static final Log LOG = LogFactory.getLog(
+ HBaseBulkImportJob.class.getName());
+
++ private Connection hbaseConnection;
++
+ public HBaseBulkImportJob(final SqoopOptions opts,
+ final ImportJobContext importContext) {
+ super(opts, importContext);
+@@ -81,8 +87,21 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
+ FileOutputFormat.setOutputPath(job, getContext().getDestination());
+- HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+- HFileOutputFormat.configureIncrementalLoad(job, hTable);
++ TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
++ hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration());
++
++ try (
++ Table hbaseTable = hbaseConnection.getTable(hbaseTableName)
++ ) {
++ HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
++ } catch (IOException | RuntimeException e) {
++ try {
++ hbaseConnection.close();
++ } catch (IOException ioException) {
++ LOG.error("Cannot close HBase connection.", ioException);
++ }
++ throw e;
++ }
+ }
+
+ /**
+@@ -99,15 +118,16 @@ protected void completeImport(Job job) throws IOException, ImportException {
+ setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
+ FsPermission.createImmutable((short) 00777));
+
+- HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
++ TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
+
+ // Load generated HFiles into table
+- try {
+- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+- job.getConfiguration());
+- loader.doBulkLoad(bulkLoadDir, hTable);
+- }
+- catch (Exception e) {
++ try (
++ Table hbaseTable = hbaseConnection.getTable(hbaseTableName);
++ Admin hbaseAdmin = hbaseConnection.getAdmin()
++ ) {
++ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration());
++ loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
++ } catch (Exception e) {
+ String errorMessage = String.format("Unrecoverable error while " +
+ "performing the bulk load of files in [%s]",
+ bulkLoadDir.toString());
+@@ -117,11 +137,19 @@ protected void completeImport(Job job) throws IOException, ImportException {
+
+ @Override
+ protected void jobTeardown(Job job) throws IOException, ImportException {
+- super.jobTeardown(job);
+- // Delete the hfiles directory after we are finished.
+- Path destination = getContext().getDestination();
+- FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+- fileSystem.delete(destination, true);
++ try {
++ super.jobTeardown(job);
++ // Delete the hfiles directory after we are finished.
++ Path destination = getContext().getDestination();
++ FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
++ fileSystem.delete(destination, true);
++ } finally {
++ try {
++ hbaseConnection.close();
++ } catch (IOException e) {
++ LOG.error("Cannot close HBase connection.", e);
++ }
++ }
+ }
+
+ /**
+diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+index 523d0a7e..5adb7883 100644
+--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
++++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+@@ -19,7 +19,6 @@
+ package org.apache.sqoop.mapreduce;
+
+ import java.io.IOException;
+-import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+
+ import org.apache.commons.logging.Log;
+@@ -28,10 +27,14 @@
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+-import org.apache.hadoop.hbase.client.HBaseAdmin;
+-import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Admin;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+ import org.apache.hadoop.hbase.security.User;
++import org.apache.hadoop.hbase.security.token.TokenUtil;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.mapreduce.Job;
+@@ -160,65 +163,28 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+ HBaseConfiguration.addHbaseResources(conf);
+ }
+
+- HBaseAdmin admin = new HBaseAdmin(conf);
++ Connection hbaseConnection = ConnectionFactory.createConnection(conf);
++ Admin admin = hbaseConnection.getAdmin();
+
+ if (!skipDelegationTokens(conf)) {
+- // Add authentication token to the job if we're running on secure cluster.
+- //
+- // We're currently supporting HBase version 0.90 that do not have security
+- // patches which means that it do not have required methods
+- // "isSecurityEnabled" and "obtainAuthTokenForJob".
+- //
+- // We're using reflection API to see if those methods are available and call
+- // them only if they are present.
+- //
+- // After we will remove support for HBase 0.90 we can simplify the code to
+- // following code fragment:
+- /*
+ try {
+- if (User.isSecurityEnabled()) {
+- User user = User.getCurrent();
+- user.obtainAuthTokenForJob(conf, job);
++ if (User.isHBaseSecurityEnabled(conf)) {
++ TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job);
+ }
+ } catch(InterruptedException ex) {
+ throw new ImportException("Can't get authentication token", ex);
+ }
+- */
+- try {
+- // Get method isSecurityEnabled
+- Method isHBaseSecurityEnabled = User.class.getMethod(
+- "isHBaseSecurityEnabled", Configuration.class);
+-
+- // Get method obtainAuthTokenForJob
+- Method obtainAuthTokenForJob = User.class.getMethod(
+- "obtainAuthTokenForJob", Configuration.class, Job.class);
+-
+- // Get current user
+- User user = User.getCurrent();
+-
+- // Obtain security token if needed
+- if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) {
+- obtainAuthTokenForJob.invoke(user, conf, job);
+- }
+- } catch (NoSuchMethodException e) {
+- LOG.info("It seems that we're running on HBase without security"
+- + " additions. Security additions will not be used during this job.");
+- } catch (InvocationTargetException e) {
+- throw new ImportException("Can't get authentication token", e);
+- } catch (IllegalAccessException e) {
+- throw new ImportException("Can't get authentication token", e);
+- }
+ }
+
+ // Check to see if the table exists.
+ HTableDescriptor tableDesc = null;
+ byte [] familyBytes = Bytes.toBytes(familyName);
+ HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
+- if (!admin.tableExists(tableName)) {
++ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ if (options.getCreateHBaseTable()) {
+ // Create the table.
+ LOG.info("Creating missing HBase table " + tableName);
+- tableDesc = new HTableDescriptor(tableName);
++ tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ tableDesc.addFamily(colDesc);
+ admin.createTable(tableDesc);
+ } else {
+@@ -228,16 +194,16 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+ }
+ } else {
+ // Table exists, so retrieve their current version
+- tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName));
++ tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
+
+ // Check if current version do have specified column family
+ if (!tableDesc.hasFamily(familyBytes)) {
+ if (options.getCreateHBaseTable()) {
+ // Create the column family.
+ LOG.info("Creating missing column family " + familyName);
+- admin.disableTable(tableName);
+- admin.addColumn(tableName, colDesc);
+- admin.enableTable(tableName);
++ admin.disableTable(TableName.valueOf(tableName));
++ admin.addColumn(TableName.valueOf(tableName), colDesc);
++ admin.enableTable(TableName.valueOf(tableName));
+ } else {
+ LOG.warn("Could not find column family " + familyName + " in table "
+ + tableName);
+@@ -250,10 +216,11 @@ protected void jobSetup(Job job) throws IOException, ImportException {
+ // Make sure we close the connection to HBA, this is only relevant in
+ // unit tests
+ admin.close();
++ hbaseConnection.close();
+
+ // Make sure HBase libraries are shipped as part of the job.
+ TableMapReduceUtil.addDependencyJars(job);
+- TableMapReduceUtil.addDependencyJars(conf, HTable.class);
++ TableMapReduceUtil.addDependencyJars(conf, Table.class);
+
+ super.jobSetup(job);
+ }
+diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+index d9f74952..9d365ebb 100644
+--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
++++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+@@ -38,6 +38,10 @@
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.MiniHBaseCluster;
++import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.ConnectionFactory;
++import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.client.Get;
+ import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.client.Result;
+@@ -236,9 +240,10 @@ protected void verifyHBaseCell(String tableName, String rowKey,
+ String colFamily, String colName, String val) throws IOException {
+ Get get = new Get(Bytes.toBytes(rowKey));
+ get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
+- HTable table = new HTable(new Configuration(
+- hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
+- try {
++ try (
++ Connection hbaseConnection = createHBaseConnection();
++ Table table = getHBaseTable(hbaseConnection, tableName)
++ ) {
+ Result r = table.get(get);
+ byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
+ Bytes.toBytes(colName));
+@@ -248,8 +253,6 @@ protected void verifyHBaseCell(String tableName, String rowKey,
+ assertNotNull("No result, but we expected one", actualVal);
+ assertEquals(val, Bytes.toString(actualVal));
+ }
+- } finally {
+- table.close();
+ }
+ }
+ public static File createTempDir() {
+@@ -264,18 +267,25 @@ public static File createTempDir() {
+ protected int countHBaseTable(String tableName, String colFamily)
+ throws IOException {
+ int count = 0;
+- HTable table = new HTable(new Configuration(
+- hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
+- try {
++ try (
++ Connection hbaseConnection = createHBaseConnection();
++ Table table = getHBaseTable(hbaseConnection, tableName)
++ ) {
+ ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
+ for(Result result = scanner.next();
+ result != null;
+ result = scanner.next()) {
+ count++;
+ }
+- } finally {
+- table.close();
+ }
+ return count;
+ }
++
++ private Connection createHBaseConnection() throws IOException {
++ return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration()));
++ }
++
++ private Table getHBaseTable(Connection connection, String tableName) throws IOException {
++ return connection.getTable(TableName.valueOf(tableName));
++ }
+ }
+diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
+new file mode 100644
+index 00000000..73b31772
+--- /dev/null
++++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
+@@ -0,0 +1,133 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.sqoop.hbase;
++
++import com.cloudera.sqoop.lib.FieldMappable;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hbase.client.BufferedMutator;
++import org.apache.hadoop.hbase.client.Connection;
++import org.apache.hadoop.hbase.client.Delete;
++import org.apache.hadoop.hbase.client.Mutation;
++import org.apache.hadoop.hbase.client.Put;
++import org.apache.sqoop.util.ExpectedLogMessage;
++import org.junit.Before;
++import org.junit.Rule;
++import org.junit.Test;
++
++import java.util.Arrays;
++import java.util.List;
++
++import static java.util.Collections.singletonList;
++import static org.mockito.Matchers.anyMap;
++import static org.mockito.Mockito.mock;
++import static org.mockito.Mockito.verify;
++import static org.mockito.Mockito.verifyNoMoreInteractions;
++import static org.mockito.Mockito.when;
++
++public class TestHBasePutProcessor {
++
++ @Rule
++ public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage();
++
++ private Configuration configuration;
++ private Connection hbaseConnection;
++ private PutTransformer putTransformer;
++ private BufferedMutator bufferedMutator;
++ private FieldMappable fieldMappable;
++
++ private HBasePutProcessor hBasePutProcessor;
++
++ @Before
++ public void before() {
++ configuration = mock(Configuration.class);
++ hbaseConnection = mock(Connection.class);
++ putTransformer = mock(PutTransformer.class);
++ bufferedMutator = mock(BufferedMutator.class);
++ fieldMappable = mock(FieldMappable.class);
++
++ hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection, bufferedMutator);
++ }
++
++ @Test
++ public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception {
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(null);
++ verifyNoMoreInteractions(bufferedMutator);
++
++ hBasePutProcessor.accept(fieldMappable);
++ }
++
++ @Test
++ public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception {
++ List<Mutation> inputList = Arrays.asList(null, null, null);
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++ verifyNoMoreInteractions(bufferedMutator);
++
++ hBasePutProcessor.accept(fieldMappable);
++ }
++
++ @Test
++ public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception {
++ List<Mutation> inputList = singletonList(mock(Mutation.class));
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++ verifyNoMoreInteractions(bufferedMutator);
++
++ hBasePutProcessor.accept(fieldMappable);
++ }
++
++ @Test
++ public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception {
++ Mutation emptyPutMutation = mock(Put.class);
++ when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes());
++ when(emptyPutMutation.isEmpty()).thenReturn(true);
++ List<Mutation> inputList = singletonList(emptyPutMutation);
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++ verifyNoMoreInteractions(bufferedMutator);
++ expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column: emptyPutMutation");
++
++ hBasePutProcessor.accept(fieldMappable);
++ }
++
++ @Test
++ public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception {
++ Mutation emptyDeleteMutation = mock(Delete.class);
++ when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes());
++ when(emptyDeleteMutation.isEmpty()).thenReturn(true);
++ List<Mutation> inputList = singletonList(emptyDeleteMutation);
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++ verifyNoMoreInteractions(bufferedMutator);
++ expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column: emptyDeleteMutation");
++
++ hBasePutProcessor.accept(fieldMappable);
++ }
++
++ @Test
++ public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived() throws Exception {
++ Mutation aPutMutation = mock(Put.class);
++ Mutation aDeleteMutation = mock(Delete.class);
++ Mutation anotherPutMutation = mock(Put.class);
++ List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation);
++ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
++
++ hBasePutProcessor.accept(fieldMappable);
++
++ verify(bufferedMutator).mutate(aPutMutation);
++ verify(bufferedMutator).mutate(aDeleteMutation);
++ verify(bufferedMutator).mutate(anotherPutMutation);
++ }
++
++}
+\ No newline at end of file
diff --git a/bigtop.bom b/bigtop.bom
index cb12bc0..5b2a761 100644
--- a/bigtop.bom
+++ b/bigtop.bom
@@ -200,7 +200,7 @@
'sqoop' {
name = 'sqoop'
relNotes = 'Apache Sqoop v1'
- version { base = '1.4.6'; pkg = base; release = 1 }
+ version { base = '1.4.7'; pkg = base; release = 1 }
tarball { destination = "${name}-${version.base}.tar.gz"
source = destination }
url { download_path = "/$name/${version.base}/"