| diff --git a/ivy.xml b/ivy.xml |
| index e4b45bfd..263bcc81 100644 |
| --- a/ivy.xml |
| +++ b/ivy.xml |
| @@ -162,6 +162,15 @@ under the License. |
| <exclude org="org.apache.thrift" module="thrift"/> |
| <exclude org="log4j" module="log4j"/> |
| </dependency> |
| + <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase.version}" conf="common->default"> |
| + <artifact name="hbase-zookeeper" type="jar"/> |
| + <artifact name="hbase-zookeeper" type="jar" ext="jar" m:classifier="tests"/> |
| + <exclude org="com.sun.jersey" module="jersey-core"/> |
| + <exclude org="com.sun.jersey" module="jersey-json"/> |
| + <exclude org="com.sun.jersey" module="jersey-server"/> |
| + <exclude org="org.apache.thrift" module="thrift"/> |
| + <exclude org="log4j" module="log4j"/> |
| + </dependency> |
| <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase.version}" conf="common->default"> |
| <artifact name="hbase-hadoop-compat" type="jar"/> |
| <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/> |
| diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java |
| index 032fd38a..cf97b8a6 100644 |
| --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java |
| +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java |
| @@ -25,8 +25,11 @@ |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| +import org.apache.hadoop.hbase.TableName; |
| +import org.apache.hadoop.hbase.client.BufferedMutator; |
| +import org.apache.hadoop.hbase.client.Connection; |
| +import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Delete; |
| -import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.util.Bytes; |
| @@ -84,12 +87,19 @@ |
| // into a Put command. |
| private PutTransformer putTransformer; |
| |
| - private String tableName; |
| - private HTable table; |
| + private Connection hbaseConnection; |
| + private BufferedMutator bufferedMutator; |
| |
| public HBasePutProcessor() { |
| } |
| |
| + HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection, BufferedMutator bufferedMutator) { |
| + this.conf = conf; |
| + this.putTransformer = putTransformer; |
| + this.hbaseConnection = hbaseConnection; |
| + this.bufferedMutator = bufferedMutator; |
| + } |
| + |
| @Override |
| @SuppressWarnings("unchecked") |
| public void setConf(Configuration config) { |
| @@ -106,15 +116,24 @@ public void setConf(Configuration config) { |
| throw new RuntimeException("Could not instantiate PutTransformer."); |
| } |
| putTransformer.init(conf); |
| + initHBaseMutator(); |
| + } |
| |
| - this.tableName = conf.get(TABLE_NAME_KEY, null); |
| + private void initHBaseMutator() { |
| + String tableName = conf.get(TABLE_NAME_KEY, null); |
| try { |
| - this.table = new HTable(conf, this.tableName); |
| - } catch (IOException ioe) { |
| - throw new RuntimeException("Could not access HBase table " + tableName, |
| - ioe); |
| + hbaseConnection = ConnectionFactory.createConnection(conf); |
| + bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName)); |
| + } catch (IOException e) { |
| + if (hbaseConnection != null) { |
| + try { |
| + hbaseConnection.close(); |
| + } catch (IOException connCloseException){ |
| + LOG.error("Cannot close HBase connection.", connCloseException); |
| + } |
| + } |
| + throw new RuntimeException("Could not create mutator for HBase table " + tableName, e); |
| } |
| - this.table.setAutoFlush(false); |
| } |
| |
| @Override |
| @@ -131,38 +150,54 @@ public void accept(FieldMappable record) |
| throws IOException, ProcessingException { |
| Map<String, Object> fields = record.getFieldMap(); |
| List<Mutation> mutationList = putTransformer.getMutationCommand(fields); |
| - if (null != mutationList) { |
| - for (Mutation mutation : mutationList) { |
| - if (mutation!=null) { |
| - if(mutation instanceof Put) { |
| - Put putObject = (Put) mutation; |
| - if (putObject.isEmpty()) { |
| - LOG.warn("Could not insert row with no columns " |
| - + "for row-key column: " + Bytes.toString(putObject.getRow())); |
| - } else { |
| - this.table.put(putObject); |
| - } |
| - } else if(mutation instanceof Delete) { |
| - Delete deleteObject = (Delete) mutation; |
| - if (deleteObject.isEmpty()) { |
| - LOG.warn("Could not delete row with no columns " |
| - + "for row-key column: " + Bytes.toString(deleteObject.getRow())); |
| - } else { |
| - this.table.delete(deleteObject); |
| - } |
| - } |
| - } |
| + if (mutationList == null) { |
| + return; |
| + } |
| + for (Mutation mutation : mutationList) { |
| + if (!canAccept(mutation)) { |
| + continue; |
| + } |
| + if (!mutation.isEmpty()) { |
| + bufferedMutator.mutate(mutation); |
| + } else { |
| + logEmptyMutation(mutation); |
| } |
| } |
| } |
| |
| + private void logEmptyMutation(Mutation mutation) { |
| + String action = null; |
| + if (mutation instanceof Put) { |
| + action = "insert"; |
| + } else if (mutation instanceof Delete) { |
| + action = "delete"; |
| + } |
| + LOG.warn("Could not " + action + " row with no columns " |
| + + "for row-key column: " + Bytes.toString(mutation.getRow())); |
| + } |
| + |
| + private boolean canAccept(Mutation mutation) { |
| + return mutation != null && (mutation instanceof Put || mutation instanceof Delete); |
| + } |
| + |
| @Override |
| /** |
| * Closes the HBase table and commits all pending operations. |
| */ |
| public void close() throws IOException { |
| - this.table.flushCommits(); |
| - this.table.close(); |
| + try { |
| + bufferedMutator.flush(); |
| + } finally { |
| + try { |
| + bufferedMutator.close(); |
| + } finally { |
| + try { |
| + hbaseConnection.close(); |
| + } catch (IOException e) { |
| + LOG.error("Cannot close HBase connection.", e); |
| + } |
| + } |
| + } |
| } |
| |
| } |
| diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java |
| index 2bbfffe0..ed89aeb1 100644 |
| --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java |
| +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java |
| @@ -26,10 +26,14 @@ |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| -import org.apache.hadoop.hbase.client.HTable; |
| +import org.apache.hadoop.hbase.TableName; |
| +import org.apache.hadoop.hbase.client.Admin; |
| +import org.apache.hadoop.hbase.client.Connection; |
| +import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Put; |
| +import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; |
| +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; |
| import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; |
| import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| import org.apache.hadoop.mapreduce.Job; |
| @@ -50,6 +54,8 @@ |
| public static final Log LOG = LogFactory.getLog( |
| HBaseBulkImportJob.class.getName()); |
| |
| + private Connection hbaseConnection; |
| + |
| public HBaseBulkImportJob(final SqoopOptions opts, |
| final ImportJobContext importContext) { |
| super(opts, importContext); |
| @@ -81,8 +87,21 @@ protected void jobSetup(Job job) throws IOException, ImportException { |
| |
| TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class); |
| FileOutputFormat.setOutputPath(job, getContext().getDestination()); |
| - HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); |
| - HFileOutputFormat.configureIncrementalLoad(job, hTable); |
| + TableName hbaseTableName = TableName.valueOf(options.getHBaseTable()); |
| + hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration()); |
| + |
| + try ( |
| + Table hbaseTable = hbaseConnection.getTable(hbaseTableName) |
| + ) { |
| + HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName)); |
| + } catch (IOException | RuntimeException e) { |
| + try { |
| + hbaseConnection.close(); |
| + } catch (IOException ioException) { |
| + LOG.error("Cannot close HBase connection.", ioException); |
| + } |
| + throw e; |
| + } |
| } |
| |
| /** |
| @@ -99,15 +118,16 @@ protected void completeImport(Job job) throws IOException, ImportException { |
| setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), |
| FsPermission.createImmutable((short) 00777)); |
| |
| - HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); |
| + TableName hbaseTableName = TableName.valueOf(options.getHBaseTable()); |
| |
| // Load generated HFiles into table |
| - try { |
| - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( |
| - job.getConfiguration()); |
| - loader.doBulkLoad(bulkLoadDir, hTable); |
| - } |
| - catch (Exception e) { |
| + try ( |
| + Table hbaseTable = hbaseConnection.getTable(hbaseTableName); |
| + Admin hbaseAdmin = hbaseConnection.getAdmin() |
| + ) { |
| + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration()); |
| + loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName)); |
| + } catch (Exception e) { |
| String errorMessage = String.format("Unrecoverable error while " + |
| "performing the bulk load of files in [%s]", |
| bulkLoadDir.toString()); |
| @@ -117,11 +137,19 @@ protected void completeImport(Job job) throws IOException, ImportException { |
| |
| @Override |
| protected void jobTeardown(Job job) throws IOException, ImportException { |
| - super.jobTeardown(job); |
| - // Delete the hfiles directory after we are finished. |
| - Path destination = getContext().getDestination(); |
| - FileSystem fileSystem = destination.getFileSystem(job.getConfiguration()); |
| - fileSystem.delete(destination, true); |
| + try { |
| + super.jobTeardown(job); |
| + // Delete the hfiles directory after we are finished. |
| + Path destination = getContext().getDestination(); |
| + FileSystem fileSystem = destination.getFileSystem(job.getConfiguration()); |
| + fileSystem.delete(destination, true); |
| + } finally { |
| + try { |
| + hbaseConnection.close(); |
| + } catch (IOException e) { |
| + LOG.error("Cannot close HBase connection.", e); |
| + } |
| + } |
| } |
| |
| /** |
| diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java |
| index 523d0a7e..5adb7883 100644 |
| --- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java |
| +++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java |
| @@ -19,7 +19,6 @@ |
| package org.apache.sqoop.mapreduce; |
| |
| import java.io.IOException; |
| -import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| |
| import org.apache.commons.logging.Log; |
| @@ -28,10 +27,14 @@ |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| -import org.apache.hadoop.hbase.client.HBaseAdmin; |
| -import org.apache.hadoop.hbase.client.HTable; |
| +import org.apache.hadoop.hbase.TableName; |
| +import org.apache.hadoop.hbase.client.Admin; |
| +import org.apache.hadoop.hbase.client.Connection; |
| +import org.apache.hadoop.hbase.client.ConnectionFactory; |
| +import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| import org.apache.hadoop.hbase.security.User; |
| +import org.apache.hadoop.hbase.security.token.TokenUtil; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.mapreduce.Job; |
| @@ -160,65 +163,28 @@ protected void jobSetup(Job job) throws IOException, ImportException { |
| HBaseConfiguration.addHbaseResources(conf); |
| } |
| |
| - HBaseAdmin admin = new HBaseAdmin(conf); |
| + Connection hbaseConnection = ConnectionFactory.createConnection(conf); |
| + Admin admin = hbaseConnection.getAdmin(); |
| |
| if (!skipDelegationTokens(conf)) { |
| - // Add authentication token to the job if we're running on secure cluster. |
| - // |
| - // We're currently supporting HBase version 0.90 that do not have security |
| - // patches which means that it do not have required methods |
| - // "isSecurityEnabled" and "obtainAuthTokenForJob". |
| - // |
| - // We're using reflection API to see if those methods are available and call |
| - // them only if they are present. |
| - // |
| - // After we will remove support for HBase 0.90 we can simplify the code to |
| - // following code fragment: |
| - /* |
| try { |
| - if (User.isSecurityEnabled()) { |
| - User user = User.getCurrent(); |
| - user.obtainAuthTokenForJob(conf, job); |
| + if (User.isHBaseSecurityEnabled(conf)) { |
| + TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job); |
| } |
| } catch(InterruptedException ex) { |
| throw new ImportException("Can't get authentication token", ex); |
| } |
| - */ |
| - try { |
| - // Get method isSecurityEnabled |
| - Method isHBaseSecurityEnabled = User.class.getMethod( |
| - "isHBaseSecurityEnabled", Configuration.class); |
| - |
| - // Get method obtainAuthTokenForJob |
| - Method obtainAuthTokenForJob = User.class.getMethod( |
| - "obtainAuthTokenForJob", Configuration.class, Job.class); |
| - |
| - // Get current user |
| - User user = User.getCurrent(); |
| - |
| - // Obtain security token if needed |
| - if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) { |
| - obtainAuthTokenForJob.invoke(user, conf, job); |
| - } |
| - } catch (NoSuchMethodException e) { |
| - LOG.info("It seems that we're running on HBase without security" |
| - + " additions. Security additions will not be used during this job."); |
| - } catch (InvocationTargetException e) { |
| - throw new ImportException("Can't get authentication token", e); |
| - } catch (IllegalAccessException e) { |
| - throw new ImportException("Can't get authentication token", e); |
| - } |
| } |
| |
| // Check to see if the table exists. |
| HTableDescriptor tableDesc = null; |
| byte [] familyBytes = Bytes.toBytes(familyName); |
| HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes); |
| - if (!admin.tableExists(tableName)) { |
| + if (!admin.tableExists(TableName.valueOf(tableName))) { |
| if (options.getCreateHBaseTable()) { |
| // Create the table. |
| LOG.info("Creating missing HBase table " + tableName); |
| - tableDesc = new HTableDescriptor(tableName); |
| + tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); |
| tableDesc.addFamily(colDesc); |
| admin.createTable(tableDesc); |
| } else { |
| @@ -228,16 +194,16 @@ protected void jobSetup(Job job) throws IOException, ImportException { |
| } |
| } else { |
| // Table exists, so retrieve their current version |
| - tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName)); |
| + tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName)); |
| |
| // Check if current version do have specified column family |
| if (!tableDesc.hasFamily(familyBytes)) { |
| if (options.getCreateHBaseTable()) { |
| // Create the column family. |
| LOG.info("Creating missing column family " + familyName); |
| - admin.disableTable(tableName); |
| - admin.addColumn(tableName, colDesc); |
| - admin.enableTable(tableName); |
| + admin.disableTable(TableName.valueOf(tableName)); |
| + admin.addColumn(TableName.valueOf(tableName), colDesc); |
| + admin.enableTable(TableName.valueOf(tableName)); |
| } else { |
| LOG.warn("Could not find column family " + familyName + " in table " |
| + tableName); |
| @@ -250,10 +216,11 @@ protected void jobSetup(Job job) throws IOException, ImportException { |
| // Make sure we close the connection to HBA, this is only relevant in |
| // unit tests |
| admin.close(); |
| + hbaseConnection.close(); |
| |
| // Make sure HBase libraries are shipped as part of the job. |
| TableMapReduceUtil.addDependencyJars(job); |
| - TableMapReduceUtil.addDependencyJars(conf, HTable.class); |
| + TableMapReduceUtil.addDependencyJars(conf, Table.class); |
| |
| super.jobSetup(job); |
| } |
| diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java |
| index d9f74952..9d365ebb 100644 |
| --- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java |
| +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java |
| @@ -38,6 +38,10 @@ |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.MiniHBaseCluster; |
| +import org.apache.hadoop.hbase.TableName; |
| +import org.apache.hadoop.hbase.client.Connection; |
| +import org.apache.hadoop.hbase.client.ConnectionFactory; |
| +import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.Result; |
| @@ -236,9 +240,10 @@ protected void verifyHBaseCell(String tableName, String rowKey, |
| String colFamily, String colName, String val) throws IOException { |
| Get get = new Get(Bytes.toBytes(rowKey)); |
| get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); |
| - HTable table = new HTable(new Configuration( |
| - hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName)); |
| - try { |
| + try ( |
| + Connection hbaseConnection = createHBaseConnection(); |
| + Table table = getHBaseTable(hbaseConnection, tableName) |
| + ) { |
| Result r = table.get(get); |
| byte [] actualVal = r.getValue(Bytes.toBytes(colFamily), |
| Bytes.toBytes(colName)); |
| @@ -248,8 +253,6 @@ protected void verifyHBaseCell(String tableName, String rowKey, |
| assertNotNull("No result, but we expected one", actualVal); |
| assertEquals(val, Bytes.toString(actualVal)); |
| } |
| - } finally { |
| - table.close(); |
| } |
| } |
| public static File createTempDir() { |
| @@ -264,18 +267,25 @@ public static File createTempDir() { |
| protected int countHBaseTable(String tableName, String colFamily) |
| throws IOException { |
| int count = 0; |
| - HTable table = new HTable(new Configuration( |
| - hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName)); |
| - try { |
| + try ( |
| + Connection hbaseConnection = createHBaseConnection(); |
| + Table table = getHBaseTable(hbaseConnection, tableName) |
| + ) { |
| ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily)); |
| for(Result result = scanner.next(); |
| result != null; |
| result = scanner.next()) { |
| count++; |
| } |
| - } finally { |
| - table.close(); |
| } |
| return count; |
| } |
| + |
| + private Connection createHBaseConnection() throws IOException { |
| + return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration())); |
| + } |
| + |
| + private Table getHBaseTable(Connection connection, String tableName) throws IOException { |
| + return connection.getTable(TableName.valueOf(tableName)); |
| + } |
| } |
| diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java |
| new file mode 100644 |
| index 00000000..73b31772 |
| --- /dev/null |
| +++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java |
| @@ -0,0 +1,133 @@ |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one |
| + * or more contributor license agreements. See the NOTICE file |
| + * distributed with this work for additional information |
| + * regarding copyright ownership. The ASF licenses this file |
| + * to you under the Apache License, Version 2.0 (the |
| + * "License"); you may not use this file except in compliance |
| + * with the License. You may obtain a copy of the License at |
| + * <p> |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * <p> |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| +package org.apache.sqoop.hbase; |
| + |
| +import com.cloudera.sqoop.lib.FieldMappable; |
| +import org.apache.hadoop.conf.Configuration; |
| +import org.apache.hadoop.hbase.client.BufferedMutator; |
| +import org.apache.hadoop.hbase.client.Connection; |
| +import org.apache.hadoop.hbase.client.Delete; |
| +import org.apache.hadoop.hbase.client.Mutation; |
| +import org.apache.hadoop.hbase.client.Put; |
| +import org.apache.sqoop.util.ExpectedLogMessage; |
| +import org.junit.Before; |
| +import org.junit.Rule; |
| +import org.junit.Test; |
| + |
| +import java.util.Arrays; |
| +import java.util.List; |
| + |
| +import static java.util.Collections.singletonList; |
| +import static org.mockito.Matchers.anyMap; |
| +import static org.mockito.Mockito.mock; |
| +import static org.mockito.Mockito.verify; |
| +import static org.mockito.Mockito.verifyNoMoreInteractions; |
| +import static org.mockito.Mockito.when; |
| + |
| +public class TestHBasePutProcessor { |
| + |
| + @Rule |
| + public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage(); |
| + |
| + private Configuration configuration; |
| + private Connection hbaseConnection; |
| + private PutTransformer putTransformer; |
| + private BufferedMutator bufferedMutator; |
| + private FieldMappable fieldMappable; |
| + |
| + private HBasePutProcessor hBasePutProcessor; |
| + |
| + @Before |
| + public void before() { |
| + configuration = mock(Configuration.class); |
| + hbaseConnection = mock(Connection.class); |
| + putTransformer = mock(PutTransformer.class); |
| + bufferedMutator = mock(BufferedMutator.class); |
| + fieldMappable = mock(FieldMappable.class); |
| + |
| + hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection, bufferedMutator); |
| + } |
| + |
| + @Test |
| + public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception { |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(null); |
| + verifyNoMoreInteractions(bufferedMutator); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + } |
| + |
| + @Test |
| + public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception { |
| + List<Mutation> inputList = Arrays.asList(null, null, null); |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); |
| + verifyNoMoreInteractions(bufferedMutator); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + } |
| + |
| + @Test |
| + public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception { |
| + List<Mutation> inputList = singletonList(mock(Mutation.class)); |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); |
| + verifyNoMoreInteractions(bufferedMutator); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + } |
| + |
| + @Test |
| + public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception { |
| + Mutation emptyPutMutation = mock(Put.class); |
| + when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes()); |
| + when(emptyPutMutation.isEmpty()).thenReturn(true); |
| + List<Mutation> inputList = singletonList(emptyPutMutation); |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); |
| + verifyNoMoreInteractions(bufferedMutator); |
| + expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column: emptyPutMutation"); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + } |
| + |
| + @Test |
| + public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception { |
| + Mutation emptyDeleteMutation = mock(Delete.class); |
| + when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes()); |
| + when(emptyDeleteMutation.isEmpty()).thenReturn(true); |
| + List<Mutation> inputList = singletonList(emptyDeleteMutation); |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); |
| + verifyNoMoreInteractions(bufferedMutator); |
| + expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column: emptyDeleteMutation"); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + } |
| + |
| + @Test |
| + public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived() throws Exception { |
| + Mutation aPutMutation = mock(Put.class); |
| + Mutation aDeleteMutation = mock(Delete.class); |
| + Mutation anotherPutMutation = mock(Put.class); |
| + List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation); |
| + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); |
| + |
| + hBasePutProcessor.accept(fieldMappable); |
| + |
| + verify(bufferedMutator).mutate(aPutMutation); |
| + verify(bufferedMutator).mutate(aDeleteMutation); |
| + verify(bufferedMutator).mutate(anotherPutMutation); |
| + } |
| + |
| +} |
| \ No newline at end of file |