blob: 0cbf6c341fb0d060d2c358807bf9daceef286ee7 [file] [log] [blame]
diff --git a/ivy.xml b/ivy.xml
index e4b45bfd..263bcc81 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -162,6 +162,15 @@ under the License.
<exclude org="org.apache.thrift" module="thrift"/>
<exclude org="log4j" module="log4j"/>
</dependency>
+ <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase.version}" conf="common->default">
+ <artifact name="hbase-zookeeper" type="jar"/>
+ <artifact name="hbase-zookeeper" type="jar" ext="jar" m:classifier="tests"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="org.apache.thrift" module="thrift"/>
+ <exclude org="log4j" module="log4j"/>
+ </dependency>
<dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase.version}" conf="common->default">
<artifact name="hbase-hadoop-compat" type="jar"/>
<artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 032fd38a..cf97b8a6 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -25,8 +25,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
@@ -84,12 +87,19 @@
// into a Put command.
private PutTransformer putTransformer;
- private String tableName;
- private HTable table;
+ private Connection hbaseConnection;
+ private BufferedMutator bufferedMutator;
public HBasePutProcessor() {
}
+ HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection, BufferedMutator bufferedMutator) {
+ this.conf = conf;
+ this.putTransformer = putTransformer;
+ this.hbaseConnection = hbaseConnection;
+ this.bufferedMutator = bufferedMutator;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration config) {
@@ -106,15 +116,24 @@ public void setConf(Configuration config) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
putTransformer.init(conf);
+ initHBaseMutator();
+ }
- this.tableName = conf.get(TABLE_NAME_KEY, null);
+ private void initHBaseMutator() {
+ String tableName = conf.get(TABLE_NAME_KEY, null);
try {
- this.table = new HTable(conf, this.tableName);
- } catch (IOException ioe) {
- throw new RuntimeException("Could not access HBase table " + tableName,
- ioe);
+ hbaseConnection = ConnectionFactory.createConnection(conf);
+ bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName));
+ } catch (IOException e) {
+ if (hbaseConnection != null) {
+ try {
+ hbaseConnection.close();
+ } catch (IOException connCloseException){
+ LOG.error("Cannot close HBase connection.", connCloseException);
+ }
+ }
+ throw new RuntimeException("Could not create mutator for HBase table " + tableName, e);
}
- this.table.setAutoFlush(false);
}
@Override
@@ -131,38 +150,54 @@ public void accept(FieldMappable record)
throws IOException, ProcessingException {
Map<String, Object> fields = record.getFieldMap();
List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
- if (null != mutationList) {
- for (Mutation mutation : mutationList) {
- if (mutation!=null) {
- if(mutation instanceof Put) {
- Put putObject = (Put) mutation;
- if (putObject.isEmpty()) {
- LOG.warn("Could not insert row with no columns "
- + "for row-key column: " + Bytes.toString(putObject.getRow()));
- } else {
- this.table.put(putObject);
- }
- } else if(mutation instanceof Delete) {
- Delete deleteObject = (Delete) mutation;
- if (deleteObject.isEmpty()) {
- LOG.warn("Could not delete row with no columns "
- + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
- } else {
- this.table.delete(deleteObject);
- }
- }
- }
+ if (mutationList == null) {
+ return;
+ }
+ for (Mutation mutation : mutationList) {
+ if (!canAccept(mutation)) {
+ continue;
+ }
+ if (!mutation.isEmpty()) {
+ bufferedMutator.mutate(mutation);
+ } else {
+ logEmptyMutation(mutation);
}
}
}
+ private void logEmptyMutation(Mutation mutation) {
+ String action = null;
+ if (mutation instanceof Put) {
+ action = "insert";
+ } else if (mutation instanceof Delete) {
+ action = "delete";
+ }
+ LOG.warn("Could not " + action + " row with no columns "
+ + "for row-key column: " + Bytes.toString(mutation.getRow()));
+ }
+
+ private boolean canAccept(Mutation mutation) {
+ return mutation != null && (mutation instanceof Put || mutation instanceof Delete);
+ }
+
@Override
/**
* Closes the HBase table and commits all pending operations.
*/
public void close() throws IOException {
- this.table.flushCommits();
- this.table.close();
+ try {
+ bufferedMutator.flush();
+ } finally {
+ try {
+ bufferedMutator.close();
+ } finally {
+ try {
+ hbaseConnection.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close HBase connection.", e);
+ }
+ }
+ }
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
index 2bbfffe0..ed89aeb1 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -26,10 +26,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
@@ -50,6 +54,8 @@
public static final Log LOG = LogFactory.getLog(
HBaseBulkImportJob.class.getName());
+ private Connection hbaseConnection;
+
public HBaseBulkImportJob(final SqoopOptions opts,
final ImportJobContext importContext) {
super(opts, importContext);
@@ -81,8 +87,21 @@ protected void jobSetup(Job job) throws IOException, ImportException {
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
FileOutputFormat.setOutputPath(job, getContext().getDestination());
- HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
- HFileOutputFormat.configureIncrementalLoad(job, hTable);
+ TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
+ hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration());
+
+ try (
+ Table hbaseTable = hbaseConnection.getTable(hbaseTableName)
+ ) {
+ HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
+ } catch (IOException | RuntimeException e) {
+ try {
+ hbaseConnection.close();
+ } catch (IOException ioException) {
+ LOG.error("Cannot close HBase connection.", ioException);
+ }
+ throw e;
+ }
}
/**
@@ -99,15 +118,16 @@ protected void completeImport(Job job) throws IOException, ImportException {
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
- HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+ TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
// Load generated HFiles into table
- try {
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
- job.getConfiguration());
- loader.doBulkLoad(bulkLoadDir, hTable);
- }
- catch (Exception e) {
+ try (
+ Table hbaseTable = hbaseConnection.getTable(hbaseTableName);
+ Admin hbaseAdmin = hbaseConnection.getAdmin()
+ ) {
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration());
+ loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
+ } catch (Exception e) {
String errorMessage = String.format("Unrecoverable error while " +
"performing the bulk load of files in [%s]",
bulkLoadDir.toString());
@@ -117,11 +137,19 @@ protected void completeImport(Job job) throws IOException, ImportException {
@Override
protected void jobTeardown(Job job) throws IOException, ImportException {
- super.jobTeardown(job);
- // Delete the hfiles directory after we are finished.
- Path destination = getContext().getDestination();
- FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
- fileSystem.delete(destination, true);
+ try {
+ super.jobTeardown(job);
+ // Delete the hfiles directory after we are finished.
+ Path destination = getContext().getDestination();
+ FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+ fileSystem.delete(destination, true);
+ } finally {
+ try {
+ hbaseConnection.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close HBase connection.", e);
+ }
+ }
}
/**
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
index 523d0a7e..5adb7883 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
@@ -19,7 +19,6 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
@@ -28,10 +27,14 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
@@ -160,65 +163,28 @@ protected void jobSetup(Job job) throws IOException, ImportException {
HBaseConfiguration.addHbaseResources(conf);
}
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Connection hbaseConnection = ConnectionFactory.createConnection(conf);
+ Admin admin = hbaseConnection.getAdmin();
if (!skipDelegationTokens(conf)) {
- // Add authentication token to the job if we're running on secure cluster.
- //
- // We're currently supporting HBase version 0.90 that do not have security
- // patches which means that it do not have required methods
- // "isSecurityEnabled" and "obtainAuthTokenForJob".
- //
- // We're using reflection API to see if those methods are available and call
- // them only if they are present.
- //
- // After we will remove support for HBase 0.90 we can simplify the code to
- // following code fragment:
- /*
try {
- if (User.isSecurityEnabled()) {
- User user = User.getCurrent();
- user.obtainAuthTokenForJob(conf, job);
+ if (User.isHBaseSecurityEnabled(conf)) {
+ TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job);
}
} catch(InterruptedException ex) {
throw new ImportException("Can't get authentication token", ex);
}
- */
- try {
- // Get method isSecurityEnabled
- Method isHBaseSecurityEnabled = User.class.getMethod(
- "isHBaseSecurityEnabled", Configuration.class);
-
- // Get method obtainAuthTokenForJob
- Method obtainAuthTokenForJob = User.class.getMethod(
- "obtainAuthTokenForJob", Configuration.class, Job.class);
-
- // Get current user
- User user = User.getCurrent();
-
- // Obtain security token if needed
- if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) {
- obtainAuthTokenForJob.invoke(user, conf, job);
- }
- } catch (NoSuchMethodException e) {
- LOG.info("It seems that we're running on HBase without security"
- + " additions. Security additions will not be used during this job.");
- } catch (InvocationTargetException e) {
- throw new ImportException("Can't get authentication token", e);
- } catch (IllegalAccessException e) {
- throw new ImportException("Can't get authentication token", e);
- }
}
// Check to see if the table exists.
HTableDescriptor tableDesc = null;
byte [] familyBytes = Bytes.toBytes(familyName);
HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
- if (!admin.tableExists(tableName)) {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
if (options.getCreateHBaseTable()) {
// Create the table.
LOG.info("Creating missing HBase table " + tableName);
- tableDesc = new HTableDescriptor(tableName);
+ tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
tableDesc.addFamily(colDesc);
admin.createTable(tableDesc);
} else {
@@ -228,16 +194,16 @@ protected void jobSetup(Job job) throws IOException, ImportException {
}
} else {
// Table exists, so retrieve their current version
- tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName));
+ tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
// Check if current version do have specified column family
if (!tableDesc.hasFamily(familyBytes)) {
if (options.getCreateHBaseTable()) {
// Create the column family.
LOG.info("Creating missing column family " + familyName);
- admin.disableTable(tableName);
- admin.addColumn(tableName, colDesc);
- admin.enableTable(tableName);
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.addColumn(TableName.valueOf(tableName), colDesc);
+ admin.enableTable(TableName.valueOf(tableName));
} else {
LOG.warn("Could not find column family " + familyName + " in table "
+ tableName);
@@ -250,10 +216,11 @@ protected void jobSetup(Job job) throws IOException, ImportException {
// Make sure we close the connection to HBA, this is only relevant in
// unit tests
admin.close();
+ hbaseConnection.close();
// Make sure HBase libraries are shipped as part of the job.
TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.addDependencyJars(conf, HTable.class);
+ TableMapReduceUtil.addDependencyJars(conf, Table.class);
super.jobSetup(job);
}
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index d9f74952..9d365ebb 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -38,6 +38,10 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -236,9 +240,10 @@ protected void verifyHBaseCell(String tableName, String rowKey,
String colFamily, String colName, String val) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
- HTable table = new HTable(new Configuration(
- hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
- try {
+ try (
+ Connection hbaseConnection = createHBaseConnection();
+ Table table = getHBaseTable(hbaseConnection, tableName)
+ ) {
Result r = table.get(get);
byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
Bytes.toBytes(colName));
@@ -248,8 +253,6 @@ protected void verifyHBaseCell(String tableName, String rowKey,
assertNotNull("No result, but we expected one", actualVal);
assertEquals(val, Bytes.toString(actualVal));
}
- } finally {
- table.close();
}
}
public static File createTempDir() {
@@ -264,18 +267,25 @@ public static File createTempDir() {
protected int countHBaseTable(String tableName, String colFamily)
throws IOException {
int count = 0;
- HTable table = new HTable(new Configuration(
- hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
- try {
+ try (
+ Connection hbaseConnection = createHBaseConnection();
+ Table table = getHBaseTable(hbaseConnection, tableName)
+ ) {
ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
for(Result result = scanner.next();
result != null;
result = scanner.next()) {
count++;
}
- } finally {
- table.close();
}
return count;
}
+
+ private Connection createHBaseConnection() throws IOException {
+ return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration()));
+ }
+
+ private Table getHBaseTable(Connection connection, String tableName) throws IOException {
+ return connection.getTable(TableName.valueOf(tableName));
+ }
}
diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
new file mode 100644
index 00000000..73b31772
--- /dev/null
+++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.hbase;
+
+import com.cloudera.sqoop.lib.FieldMappable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.sqoop.util.ExpectedLogMessage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestHBasePutProcessor {
+
+ @Rule
+ public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage();
+
+ private Configuration configuration;
+ private Connection hbaseConnection;
+ private PutTransformer putTransformer;
+ private BufferedMutator bufferedMutator;
+ private FieldMappable fieldMappable;
+
+ private HBasePutProcessor hBasePutProcessor;
+
+ @Before
+ public void before() {
+ configuration = mock(Configuration.class);
+ hbaseConnection = mock(Connection.class);
+ putTransformer = mock(PutTransformer.class);
+ bufferedMutator = mock(BufferedMutator.class);
+ fieldMappable = mock(FieldMappable.class);
+
+ hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection, bufferedMutator);
+ }
+
+ @Test
+ public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception {
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(null);
+ verifyNoMoreInteractions(bufferedMutator);
+
+ hBasePutProcessor.accept(fieldMappable);
+ }
+
+ @Test
+ public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception {
+ List<Mutation> inputList = Arrays.asList(null, null, null);
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+ verifyNoMoreInteractions(bufferedMutator);
+
+ hBasePutProcessor.accept(fieldMappable);
+ }
+
+ @Test
+ public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception {
+ List<Mutation> inputList = singletonList(mock(Mutation.class));
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+ verifyNoMoreInteractions(bufferedMutator);
+
+ hBasePutProcessor.accept(fieldMappable);
+ }
+
+ @Test
+ public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception {
+ Mutation emptyPutMutation = mock(Put.class);
+ when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes());
+ when(emptyPutMutation.isEmpty()).thenReturn(true);
+ List<Mutation> inputList = singletonList(emptyPutMutation);
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+ verifyNoMoreInteractions(bufferedMutator);
+ expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column: emptyPutMutation");
+
+ hBasePutProcessor.accept(fieldMappable);
+ }
+
+ @Test
+ public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception {
+ Mutation emptyDeleteMutation = mock(Delete.class);
+ when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes());
+ when(emptyDeleteMutation.isEmpty()).thenReturn(true);
+ List<Mutation> inputList = singletonList(emptyDeleteMutation);
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+ verifyNoMoreInteractions(bufferedMutator);
+ expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column: emptyDeleteMutation");
+
+ hBasePutProcessor.accept(fieldMappable);
+ }
+
+ @Test
+ public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived() throws Exception {
+ Mutation aPutMutation = mock(Put.class);
+ Mutation aDeleteMutation = mock(Delete.class);
+ Mutation anotherPutMutation = mock(Put.class);
+ List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation);
+ when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+
+ hBasePutProcessor.accept(fieldMappable);
+
+ verify(bufferedMutator).mutate(aPutMutation);
+ verify(bufferedMutator).mutate(aDeleteMutation);
+ verify(bufferedMutator).mutate(anotherPutMutation);
+ }
+
+}
\ No newline at end of file