| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.coprocessor; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Optional; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALKey; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| |
| @Category(MediumTests.class) |
| public class TestRegionObserverForAddingMutationsFromCoprocessors { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestRegionObserverForAddingMutationsFromCoprocessors.class); |
| |
| private static final Logger LOG |
| = LoggerFactory.getLogger(TestRegionObserverForAddingMutationsFromCoprocessors.class); |
| |
| private static HBaseTestingUtility util; |
| private static final byte[] dummy = Bytes.toBytes("dummy"); |
| private static final byte[] row1 = Bytes.toBytes("r1"); |
| private static final byte[] row2 = Bytes.toBytes("r2"); |
| private static final byte[] row3 = Bytes.toBytes("r3"); |
| private static final byte[] test = Bytes.toBytes("test"); |
| |
| @Rule |
| public TestName name = new TestName(); |
| private TableName tableName; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| Configuration conf = HBaseConfiguration.create(); |
| conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); |
| util = new HBaseTestingUtility(conf); |
| util.startMiniCluster(); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| util.shutdownMiniCluster(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| tableName = TableName.valueOf(name.getMethodName()); |
| } |
| |
| private void createTable(String coprocessor) throws IOException { |
| TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(dummy)) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(test)).setCoprocessor(coprocessor).build(); |
| util.getAdmin().createTable(tableDescriptor); |
| } |
| |
| /** |
| * Test various multiput operations. |
| */ |
| @Test |
| public void testMulti() throws Exception { |
| createTable(TestMultiMutationCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(new Put(row1).addColumn(test, dummy, dummy)); |
| assertRowCount(t, 3); |
| } |
| } |
| |
| /** |
| * Tests that added mutations from coprocessors end up in the WAL. |
| */ |
| @Test |
| public void testCPMutationsAreWrittenToWALEdit() throws Exception { |
| createTable(TestMultiMutationCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(new Put(row1).addColumn(test, dummy, dummy)); |
| assertRowCount(t, 3); |
| } |
| |
| assertNotNull(TestWALObserver.savedEdit); |
| assertEquals(4, TestWALObserver.savedEdit.getCells().size()); |
| } |
| |
| private static void assertRowCount(Table t, int expected) throws IOException { |
| try (ResultScanner scanner = t.getScanner(new Scan())) { |
| int i = 0; |
| for (Result r: scanner) { |
| LOG.info(r.toString()); |
| i++; |
| } |
| assertEquals(expected, i); |
| } |
| } |
| |
| @Test |
| public void testDeleteCell() throws Exception { |
| createTable(TestDeleteCellCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(Lists.newArrayList( |
| new Put(row1).addColumn(test, dummy, dummy), |
| new Put(row2).addColumn(test, dummy, dummy), |
| new Put(row3).addColumn(test, dummy, dummy) |
| )); |
| |
| assertRowCount(t, 3); |
| |
| t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row |
| assertRowCount(t, 1); |
| } |
| } |
| |
| @Test |
| public void testDeleteFamily() throws Exception { |
| createTable(TestDeleteFamilyCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(Lists.newArrayList( |
| new Put(row1).addColumn(test, dummy, dummy), |
| new Put(row2).addColumn(test, dummy, dummy), |
| new Put(row3).addColumn(test, dummy, dummy) |
| )); |
| |
| assertRowCount(t, 3); |
| |
| t.delete(new Delete(test).addFamily(test)); // delete non-existing row |
| assertRowCount(t, 1); |
| } |
| } |
| |
| @Test |
| public void testDeleteRow() throws Exception { |
| createTable(TestDeleteRowCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(Lists.newArrayList( |
| new Put(row1).addColumn(test, dummy, dummy), |
| new Put(row2).addColumn(test, dummy, dummy), |
| new Put(row3).addColumn(test, dummy, dummy) |
| )); |
| |
| assertRowCount(t, 3); |
| |
| t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row |
| assertRowCount(t, 1); |
| } |
| } |
| |
| @Test |
| public void testPutWithTTL() throws Exception { |
| createTable(TestPutWithTTLCoprocessor.class.getName()); |
| |
| try (Table t = util.getConnection().getTable(tableName)) { |
| t.put(new Put(row1).addColumn(test, dummy, dummy).setTTL(3000)); |
| assertRowCount(t, 2); |
| // wait long enough for the TTL to expire |
| Thread.sleep(5000); |
| assertRowCount(t, 0); |
| } |
| } |
| |
| public static class TestPutWithTTLCoprocessor implements RegionCoprocessor, RegionObserver { |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, |
| MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| Mutation mut = miniBatchOp.getOperation(0); |
| List<Cell> cells = mut.getFamilyCellMap().get(test); |
| Put[] puts = new Put[] { |
| new Put(Bytes.toBytes("cpPut")).addColumn(test, dummy, cells.get(0).getTimestamp(), |
| Bytes.toBytes("cpdummy")).setTTL(mut.getTTL()) |
| }; |
| LOG.info("Putting:" + Arrays.toString(puts)); |
| miniBatchOp.addOperationsFromCP(0, puts); |
| } |
| } |
| |
| public static class TestMultiMutationCoprocessor implements RegionCoprocessor, RegionObserver { |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, |
| MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| Mutation mut = miniBatchOp.getOperation(0); |
| List<Cell> cells = mut.getFamilyCellMap().get(test); |
| Put[] puts = new Put[] { |
| new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), |
| Bytes.toBytes("cpdummy")), |
| new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), |
| new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), |
| }; |
| LOG.info("Putting:" + Arrays.toString(puts)); |
| miniBatchOp.addOperationsFromCP(0, puts); |
| } |
| } |
| |
| public static class TestDeleteCellCoprocessor implements RegionCoprocessor, RegionObserver { |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, |
| MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| Mutation mut = miniBatchOp.getOperation(0); |
| |
| if (mut instanceof Delete) { |
| List<Cell> cells = mut.getFamilyCellMap().get(test); |
| Delete[] deletes = new Delete[] { |
| // delete only 2 rows |
| new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), |
| new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), |
| }; |
| LOG.info("Deleting:" + Arrays.toString(deletes)); |
| miniBatchOp.addOperationsFromCP(0, deletes); |
| } |
| } |
| } |
| |
| public static class TestDeleteFamilyCoprocessor implements RegionCoprocessor, RegionObserver { |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, |
| MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| Mutation mut = miniBatchOp.getOperation(0); |
| |
| if (mut instanceof Delete) { |
| List<Cell> cells = mut.getFamilyCellMap().get(test); |
| Delete[] deletes = new Delete[] { |
| // delete only 2 rows |
| new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), |
| new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), |
| }; |
| LOG.info("Deleting:" + Arrays.toString(deletes)); |
| miniBatchOp.addOperationsFromCP(0, deletes); |
| } |
| } |
| } |
| |
| public static class TestDeleteRowCoprocessor implements RegionCoprocessor, RegionObserver { |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, |
| MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| Mutation mut = miniBatchOp.getOperation(0); |
| |
| if (mut instanceof Delete) { |
| List<Cell> cells = mut.getFamilyCellMap().get(test); |
| Delete[] deletes = new Delete[] { |
| // delete only 2 rows |
| new Delete(row1, cells.get(0).getTimestamp()), |
| new Delete(row2, cells.get(0).getTimestamp()), |
| }; |
| LOG.info("Deleting:" + Arrays.toString(deletes)); |
| miniBatchOp.addOperationsFromCP(0, deletes); |
| } |
| } |
| } |
| |
| public static class TestWALObserver implements WALCoprocessor, WALObserver { |
| static WALEdit savedEdit = null; |
| |
| @Override |
| public Optional<WALObserver> getWALObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, |
| RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { |
| if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { |
| savedEdit = logEdit; |
| } |
| } |
| } |
| } |