| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.client; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.Coprocessor; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.RegionMetrics; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.coprocessor.RegionObserver; |
| import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; |
| import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; |
| import org.apache.hadoop.hbase.ipc.RpcClient; |
| import org.apache.hadoop.hbase.ipc.RpcClientFactory; |
| import org.apache.hadoop.hbase.ipc.ServerRpcController; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.testclassification.ClientTests; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; |
| |
| @Category({LargeTests.class, ClientTests.class}) |
| public class TestFromClientSide3 { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestFromClientSide3.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); |
| private final static HBaseTestingUtility TEST_UTIL |
| = new HBaseTestingUtility(); |
| private static final int WAITTABLE_MILLIS = 10000; |
| private static byte[] FAMILY = Bytes.toBytes("testFamily"); |
| private static Random random = new Random(); |
| private static int SLAVES = 3; |
| private static final byte[] ROW = Bytes.toBytes("testRow"); |
| private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); |
| private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); |
| private static final byte[] VALUE = Bytes.toBytes("testValue"); |
| private static final byte[] COL_QUAL = Bytes.toBytes("f1"); |
| private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); |
| private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); |
| |
| @Rule |
| public TestName name = new TestName(); |
| private TableName tableName; |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| TEST_UTIL.startMiniCluster(SLAVES); |
| } |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| TEST_UTIL.shutdownMiniCluster(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| tableName = TableName.valueOf(name.getMethodName()); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { |
| LOG.info("Tear down, remove table=" + htd.getTableName()); |
| TEST_UTIL.deleteTable(htd.getTableName()); |
| } |
| } |
| |
| private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) |
| throws Exception { |
| Put put = new Put(row); |
| for (int i = 0; i < nPuts; i++) { |
| byte[] qualifier = Bytes.toBytes(random.nextInt()); |
| byte[] value = Bytes.toBytes(random.nextInt()); |
| put.addColumn(family, qualifier, value); |
| } |
| table.put(put); |
| } |
| |
| private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family, |
| int nFlushes, int nPuts) throws Exception { |
| for (int i = 0; i < nFlushes; i++) { |
| randomCFPuts(table, row, family, nPuts); |
| admin.flush(table.getName()); |
| } |
| } |
| |
| private static List<Cell> toList(ResultScanner scanner) { |
| try { |
| List<Cell> cells = new ArrayList<>(); |
| for (Result r : scanner) { |
| cells.addAll(r.listCells()); |
| } |
| return cells; |
| } finally { |
| scanner.close(); |
| } |
| } |
| |
| @Test |
| public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| byte[] row = Bytes.toBytes("SpecifiedRow"); |
| byte[] value0 = Bytes.toBytes("value_0"); |
| byte[] value1 = Bytes.toBytes("value_1"); |
| Put put = new Put(row); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| Delete d = new Delete(row); |
| table.delete(d); |
| put = new Put(row); |
| put.addColumn(FAMILY, null, value0); |
| table.put(put); |
| put = new Put(row); |
| put.addColumn(FAMILY, null, value1); |
| table.put(put); |
| List<Cell> cells = toList(table.getScanner(new Scan())); |
| assertEquals(1, cells.size()); |
| assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); |
| |
| cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); |
| assertEquals(1, cells.size()); |
| assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); |
| |
| cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); |
| assertEquals(0, cells.size()); |
| |
| TEST_UTIL.getAdmin().flush(tableName); |
| cells = toList(table.getScanner(new Scan())); |
| assertEquals(1, cells.size()); |
| assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); |
| |
| cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); |
| assertEquals(1, cells.size()); |
| assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); |
| |
| cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); |
| assertEquals(0, cells.size()); |
| } |
| } |
| |
| @Test |
| public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| byte[] row = Bytes.toBytes("SpecifiedRow"); |
| byte[] qual0 = Bytes.toBytes("qual0"); |
| byte[] qual1 = Bytes.toBytes("qual1"); |
| long now = System.currentTimeMillis(); |
| Delete d = new Delete(row, now); |
| table.delete(d); |
| |
| Put put = new Put(row); |
| put.addColumn(FAMILY, null, now + 1, VALUE); |
| table.put(put); |
| |
| put = new Put(row); |
| put.addColumn(FAMILY, qual1, now + 2, qual1); |
| table.put(put); |
| |
| put = new Put(row); |
| put.addColumn(FAMILY, qual0, now + 3, qual0); |
| table.put(put); |
| |
| Result r = table.get(new Get(row)); |
| assertEquals(r.toString(), 3, r.size()); |
| assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); |
| assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); |
| assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); |
| |
| TEST_UTIL.getAdmin().flush(tableName); |
| r = table.get(new Get(row)); |
| assertEquals(3, r.size()); |
| assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); |
| assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); |
| assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); |
| } |
| } |
| |
| private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region) |
| throws IOException { |
| for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) { |
| if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) { |
| return metrics.getStoreFileCount(); |
| } |
| } |
| return 0; |
| } |
| |
| // override the config settings at the CF level and ensure priority |
| @Test |
| public void testAdvancedConfigOverride() throws Exception { |
| /* |
| * Overall idea: (1) create 3 store files and issue a compaction. config's |
| * compaction.min == 3, so should work. (2) Increase the compaction.min |
| * toggle in the HTD to 5 and modify table. If we use the HTD value instead |
| * of the default config value, adding 3 files and issuing a compaction |
| * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 |
| * and modify table. The CF schema should override the Table schema and now |
| * cause a minor compaction. |
| */ |
| TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); |
| |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| Admin admin = TEST_UTIL.getAdmin(); |
| |
| // Create 3 store files. |
| byte[] row = Bytes.toBytes(random.nextInt()); |
| performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100); |
| |
| try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { |
| // Verify we have multiple store files. |
| HRegionLocation loc = locator.getRegionLocation(row, true); |
| assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1); |
| |
| // Issue a compaction request |
| admin.compact(tableName); |
| |
| // poll wait for the compactions to happen |
| for (int i = 0; i < 10 * 1000 / 40; ++i) { |
| // The number of store files after compaction should be lesser. |
| loc = locator.getRegionLocation(row, true); |
| if (!loc.getRegion().isOffline()) { |
| if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) { |
| break; |
| } |
| } |
| Thread.sleep(40); |
| } |
| // verify the compactions took place and that we didn't just time out |
| assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1); |
| |
| // change the compaction.min config option for this table to 5 |
| LOG.info("hbase.hstore.compaction.min should now be 5"); |
| TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor()) |
| .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build(); |
| admin.modifyTable(htd); |
| LOG.info("alter status finished"); |
| |
| // Create 3 more store files. |
| performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10); |
| |
| // Issue a compaction request |
| admin.compact(tableName); |
| |
| // This time, the compaction request should not happen |
| Thread.sleep(10 * 1000); |
| loc = locator.getRegionLocation(row, true); |
| int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion()); |
| assertTrue(sfCount > 1); |
| |
| // change an individual CF's config option to 2 & online schema update |
| LOG.info("hbase.hstore.compaction.min should now be 2"); |
| htd = TableDescriptorBuilder.newBuilder(htd) |
| .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) |
| .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build()) |
| .build(); |
| admin.modifyTable(htd); |
| LOG.info("alter status finished"); |
| |
| // Issue a compaction request |
| admin.compact(tableName); |
| |
| // poll wait for the compactions to happen |
| for (int i = 0; i < 10 * 1000 / 40; ++i) { |
| loc = locator.getRegionLocation(row, true); |
| try { |
| if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) { |
| break; |
| } |
| } catch (Exception e) { |
| LOG.debug("Waiting for region to come online: " + |
| Bytes.toStringBinary(loc.getRegion().getRegionName())); |
| } |
| Thread.sleep(40); |
| } |
| |
| // verify the compaction took place and that we didn't just time out |
| assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount); |
| |
| // Finally, ensure that we can remove a custom config value after we made it |
| LOG.info("Removing CF config value"); |
| LOG.info("hbase.hstore.compaction.min should now be 5"); |
| htd = TableDescriptorBuilder.newBuilder(htd) |
| .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) |
| .setValue("hbase.hstore.compaction.min", null).build()) |
| .build(); |
| admin.modifyTable(htd); |
| LOG.info("alter status finished"); |
| assertNull(table.getDescriptor().getColumnFamily(FAMILY) |
| .getValue(Bytes.toBytes("hbase.hstore.compaction.min"))); |
| } |
| } |
| } |
| |
| @Test |
| public void testHTableBatchWithEmptyPut () throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| List actions = (List) new ArrayList(); |
| Object[] results = new Object[2]; |
| // create an empty Put |
| Put put1 = new Put(ROW); |
| actions.add(put1); |
| |
| Put put2 = new Put(ANOTHERROW); |
| put2.addColumn(FAMILY, QUALIFIER, VALUE); |
| actions.add(put2); |
| |
| table.batch(actions, results); |
| fail("Empty Put should have failed the batch call"); |
| } catch (IllegalArgumentException iae) { |
| } |
| } |
| |
| // Test Table.batch with large amount of mutations against the same key. |
| // It used to trigger read lock's "Maximum lock count exceeded" Error. |
| @Test |
| public void testHTableWithLargeBatch() throws IOException, InterruptedException { |
| int sixtyFourK = 64 * 1024; |
| List actions = new ArrayList(); |
| Object[] results = new Object[(sixtyFourK + 1) * 2]; |
| |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| for (int i = 0; i < sixtyFourK + 1; i++) { |
| Put put1 = new Put(ROW); |
| put1.addColumn(FAMILY, QUALIFIER, VALUE); |
| actions.add(put1); |
| |
| Put put2 = new Put(ANOTHERROW); |
| put2.addColumn(FAMILY, QUALIFIER, VALUE); |
| actions.add(put2); |
| } |
| |
| table.batch(actions, results); |
| } |
| } |
| |
| @Test |
| public void testBatchWithRowMutation() throws Exception { |
| LOG.info("Starting testBatchWithRowMutation"); |
| byte [][] QUALIFIERS = new byte [][] { |
| Bytes.toBytes("a"), Bytes.toBytes("b") |
| }; |
| |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| RowMutations arm = RowMutations.of(Collections.singletonList( |
| new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); |
| Object[] batchResult = new Object[1]; |
| table.batch(Arrays.asList(arm), batchResult); |
| |
| Get g = new Get(ROW); |
| Result r = table.get(g); |
| assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); |
| |
| arm = RowMutations.of(Arrays.asList( |
| new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), |
| new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); |
| table.batch(Arrays.asList(arm), batchResult); |
| r = table.get(g); |
| assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); |
| assertNull(r.getValue(FAMILY, QUALIFIERS[0])); |
| |
| // Test that we get the correct remote exception for RowMutations from batch() |
| try { |
| arm = RowMutations.of(Collections.singletonList( |
| new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); |
| table.batch(Arrays.asList(arm), batchResult); |
| fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); |
| } catch(RetriesExhaustedException e) { |
| String msg = e.getMessage(); |
| assertTrue(msg.contains("NoSuchColumnFamilyException")); |
| } |
| } |
| } |
| |
| @Test |
| public void testBatchWithCheckAndMutate() throws Exception { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| byte[] row1 = Bytes.toBytes("row1"); |
| byte[] row2 = Bytes.toBytes("row2"); |
| byte[] row3 = Bytes.toBytes("row3"); |
| byte[] row4 = Bytes.toBytes("row4"); |
| byte[] row5 = Bytes.toBytes("row5"); |
| byte[] row6 = Bytes.toBytes("row6"); |
| byte[] row7 = Bytes.toBytes("row7"); |
| |
| table.put(Arrays.asList( |
| new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), |
| new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), |
| new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), |
| new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), |
| new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), |
| new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), |
| new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))); |
| |
| CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) |
| .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) |
| .build(new RowMutations(row1) |
| .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) |
| .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) |
| .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) |
| .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); |
| Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); |
| RowMutations mutations = new RowMutations(row3) |
| .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) |
| .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) |
| .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) |
| .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); |
| CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) |
| .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) |
| .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); |
| Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); |
| CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6) |
| .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) |
| .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); |
| CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7) |
| .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) |
| .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); |
| |
| List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, |
| checkAndMutate3, checkAndMutate4); |
| Object[] results = new Object[actions.size()]; |
| table.batch(actions, results); |
| |
| CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; |
| assertTrue(checkAndMutateResult.isSuccess()); |
| assertEquals(3L, |
| Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); |
| assertEquals("d", |
| Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); |
| |
| assertEquals("b", |
| Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); |
| |
| Result result = (Result) results[2]; |
| assertTrue(result.getExists()); |
| assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); |
| assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); |
| |
| checkAndMutateResult = (CheckAndMutateResult) results[3]; |
| assertFalse(checkAndMutateResult.isSuccess()); |
| assertNull(checkAndMutateResult.getResult()); |
| |
| assertTrue(((Result) results[4]).isEmpty()); |
| |
| checkAndMutateResult = (CheckAndMutateResult) results[5]; |
| assertTrue(checkAndMutateResult.isSuccess()); |
| assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult() |
| .getValue(FAMILY, Bytes.toBytes("F")))); |
| |
| checkAndMutateResult = (CheckAndMutateResult) results[6]; |
| assertTrue(checkAndMutateResult.isSuccess()); |
| assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult() |
| .getValue(FAMILY, Bytes.toBytes("G")))); |
| |
| result = table.get(new Get(row1)); |
| assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); |
| assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); |
| assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); |
| assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); |
| |
| result = table.get(new Get(row3)); |
| assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); |
| assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); |
| assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); |
| assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); |
| assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); |
| |
| result = table.get(new Get(row4)); |
| assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); |
| |
| result = table.get(new Get(row5)); |
| assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); |
| |
| result = table.get(new Get(row6)); |
| assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); |
| |
| result = table.get(new Get(row7)); |
| assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsMethodSingleRegionSingleGet() |
| throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| // Test with a single region table. |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| |
| Get get = new Get(ROW); |
| |
| boolean exist = table.exists(get); |
| assertFalse(exist); |
| |
| table.put(put); |
| |
| exist = table.exists(get); |
| assertTrue(exist); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsMethodSingleRegionMultipleGets() |
| throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| List<Get> gets = new ArrayList<>(); |
| gets.add(new Get(ROW)); |
| gets.add(new Get(ANOTHERROW)); |
| |
| boolean[] results = table.exists(gets); |
| assertTrue(results[0]); |
| assertFalse(results[1]); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsBeforeGet() throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| Get get = new Get(ROW); |
| |
| boolean exist = table.exists(get); |
| assertEquals(true, exist); |
| |
| Result result = table.get(get); |
| assertEquals(false, result.isEmpty()); |
| assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| put = new Put(ROW2); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| Get get = new Get(ROW); |
| Get get2 = new Get(ROW2); |
| ArrayList<Get> getList = new ArrayList(2); |
| getList.add(get); |
| getList.add(get2); |
| |
| boolean[] exists = table.exists(getList); |
| assertEquals(true, exists[0]); |
| assertEquals(true, exists[1]); |
| |
| Result[] result = table.get(getList); |
| assertEquals(false, result[0].isEmpty()); |
| assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); |
| assertEquals(false, result[1].isEmpty()); |
| assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { |
| try (Table table = TEST_UTIL.createTable( |
| tableName, new byte[][] { FAMILY }, |
| 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| |
| Get get = new Get(ROW); |
| |
| boolean exist = table.exists(get); |
| assertFalse(exist); |
| |
| table.put(put); |
| |
| exist = table.exists(get); |
| assertTrue(exist); |
| } |
| } |
| |
| @Test |
| public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { |
| try (Table table = TEST_UTIL.createTable( |
| tableName, |
| new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| List<Get> gets = new ArrayList<>(); |
| gets.add(new Get(ANOTHERROW)); |
| gets.add(new Get(Bytes.add(ROW, new byte[]{0x00}))); |
| gets.add(new Get(ROW)); |
| gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00}))); |
| |
| LOG.info("Calling exists"); |
| boolean[] results = table.exists(gets); |
| assertFalse(results[0]); |
| assertFalse(results[1]); |
| assertTrue(results[2]); |
| assertFalse(results[3]); |
| |
| // Test with the first region. |
| put = new Put(new byte[]{0x00}); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| gets = new ArrayList<>(); |
| gets.add(new Get(new byte[]{0x00})); |
| gets.add(new Get(new byte[]{0x00, 0x00})); |
| results = table.exists(gets); |
| assertTrue(results[0]); |
| assertFalse(results[1]); |
| |
| // Test with the last region |
| put = new Put(new byte[]{(byte) 0xff, (byte) 0xff}); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| table.put(put); |
| |
| gets = new ArrayList<>(); |
| gets.add(new Get(new byte[]{(byte) 0xff})); |
| gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff})); |
| gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff})); |
| results = table.exists(gets); |
| assertFalse(results[0]); |
| assertTrue(results[1]); |
| assertFalse(results[2]); |
| } |
| } |
| |
| @Test |
| public void testGetEmptyRow() throws Exception { |
| //Create a table and put in 1 row |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW_BYTES); |
| put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); |
| table.put(put); |
| |
| //Try getting the row with an empty row key |
| Result res = null; |
| try { |
| res = table.get(new Get(new byte[0])); |
| fail(); |
| } catch (IllegalArgumentException e) { |
| // Expected. |
| } |
| assertTrue(res == null); |
| res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); |
| assertTrue(res.isEmpty() == true); |
| res = table.get(new Get(ROW_BYTES)); |
| assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); |
| } |
| } |
| |
| @Test |
| public void testConnectionDefaultUsesCodec() throws Exception { |
| try ( |
| RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) { |
| assertTrue(client.hasCellBlockSupport()); |
| } |
| } |
| |
| @Test |
| public void testPutWithPreBatchMutate() throws Exception { |
| testPreBatchMutate(tableName, () -> { |
| try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| t.put(put); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| }); |
| } |
| |
| @Test |
| public void testRowMutationsWithPreBatchMutate() throws Exception { |
| testPreBatchMutate(tableName, () -> { |
| try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { |
| RowMutations rm = new RowMutations(ROW, 1); |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| rm.add(put); |
| t.mutateRow(rm); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| }); |
| } |
| |
| private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception { |
| TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) |
| .setCoprocessor(WaitingForScanObserver.class.getName()).build(); |
| TEST_UTIL.getAdmin().createTable(tableDescriptor); |
| // Don't use waitTableAvailable(), because the scanner will mess up the co-processor |
| |
| ExecutorService service = Executors.newFixedThreadPool(2); |
| service.execute(rn); |
| final List<Cell> cells = new ArrayList<>(); |
| service.execute(() -> { |
| try { |
| // waiting for update. |
| TimeUnit.SECONDS.sleep(3); |
| try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { |
| Scan scan = new Scan(); |
| try (ResultScanner scanner = t.getScanner(scan)) { |
| for (Result r : scanner) { |
| cells.addAll(Arrays.asList(r.rawCells())); |
| } |
| } |
| } |
| } catch (IOException | InterruptedException ex) { |
| throw new RuntimeException(ex); |
| } |
| }); |
| service.shutdown(); |
| service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| assertEquals("The write is blocking by RegionObserver#postBatchMutate" |
| + ", so the data is invisible to reader", 0, cells.size()); |
| TEST_UTIL.deleteTable(tableName); |
| } |
| |
| @Test |
| public void testLockLeakWithDelta() throws Exception, Throwable { |
| TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) |
| .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) |
| .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); |
| TEST_UTIL.getAdmin().createTable(tableDescriptor); |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| // new a connection for lower retry number. |
| Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); |
| copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); |
| try (Connection con = ConnectionFactory.createConnection(copy)) { |
| HRegion region = (HRegion) find(tableName); |
| region.setTimeoutForWriteLock(10); |
| ExecutorService putService = Executors.newSingleThreadExecutor(); |
| putService.execute(() -> { |
| try (Table table = con.getTable(tableName)) { |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, VALUE); |
| // the put will be blocked by WaitingForMultiMutationsObserver. |
| table.put(put); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| }); |
| ExecutorService appendService = Executors.newSingleThreadExecutor(); |
| appendService.execute(() -> { |
| Append append = new Append(ROW); |
| append.addColumn(FAMILY, QUALIFIER, VALUE); |
| try (Table table = con.getTable(tableName)) { |
| table.append(append); |
| fail("The APPEND should fail because the target lock is blocked by previous put"); |
| } catch (Exception ex) { |
| } |
| }); |
| appendService.shutdown(); |
| appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| WaitingForMultiMutationsObserver observer = |
| find(tableName, WaitingForMultiMutationsObserver.class); |
| observer.latch.countDown(); |
| putService.shutdown(); |
| putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| try (Table table = con.getTable(tableName)) { |
| Result r = table.get(new Get(ROW)); |
| assertFalse(r.isEmpty()); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); |
| } |
| } |
| HRegion region = (HRegion) find(tableName); |
| int readLockCount = region.getReadLockCount(); |
| LOG.info("readLockCount:" + readLockCount); |
| assertEquals(0, readLockCount); |
| } |
| |
| @Test |
| public void testMultiRowMutations() throws Exception, Throwable { |
| TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) |
| .setCoprocessor(MultiRowMutationEndpoint.class.getName()) |
| .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) |
| .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); |
| TEST_UTIL.getAdmin().createTable(tableDescriptor); |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| // new a connection for lower retry number. |
| Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); |
| copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); |
| try (Connection con = ConnectionFactory.createConnection(copy)) { |
| byte[] row = Bytes.toBytes("ROW-0"); |
| byte[] rowLocked= Bytes.toBytes("ROW-1"); |
| byte[] value0 = Bytes.toBytes("VALUE-0"); |
| byte[] value1 = Bytes.toBytes("VALUE-1"); |
| byte[] value2 = Bytes.toBytes("VALUE-2"); |
| assertNoLocks(tableName); |
| ExecutorService putService = Executors.newSingleThreadExecutor(); |
| putService.execute(() -> { |
| try (Table table = con.getTable(tableName)) { |
| Put put0 = new Put(rowLocked); |
| put0.addColumn(FAMILY, QUALIFIER, value0); |
| // the put will be blocked by WaitingForMultiMutationsObserver. |
| table.put(put0); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| }); |
| ExecutorService cpService = Executors.newSingleThreadExecutor(); |
| AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean(); |
| cpService.execute(() -> { |
| Put put1 = new Put(row); |
| Put put2 = new Put(rowLocked); |
| put1.addColumn(FAMILY, QUALIFIER, value1); |
| put2.addColumn(FAMILY, QUALIFIER, value2); |
| try (Table table = con.getTable(tableName)) { |
| MultiRowMutationProtos.MutateRowsRequest request = |
| MultiRowMutationProtos.MutateRowsRequest.newBuilder() |
| .addMutationRequest( |
| ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1)) |
| .addMutationRequest( |
| ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2)) |
| .build(); |
| table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, |
| ROW, ROW, |
| (MultiRowMutationProtos.MultiRowMutationService exe) -> { |
| ServerRpcController controller = new ServerRpcController(); |
| CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> |
| rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); |
| exe.mutateRows(controller, request, rpcCallback); |
| if (controller.failedOnException() && |
| !(controller.getFailedOn() instanceof UnknownProtocolException)) { |
| exceptionDuringMutateRows.set(true); |
| } |
| return rpcCallback.get(); |
| }); |
| } catch (Throwable ex) { |
| LOG.error("encountered " + ex); |
| } |
| }); |
| cpService.shutdown(); |
| cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| WaitingForMultiMutationsObserver observer = find(tableName, |
| WaitingForMultiMutationsObserver.class); |
| observer.latch.countDown(); |
| putService.shutdown(); |
| putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| try (Table table = con.getTable(tableName)) { |
| Get g0 = new Get(row); |
| Get g1 = new Get(rowLocked); |
| Result r0 = table.get(g0); |
| Result r1 = table.get(g1); |
| assertTrue(r0.isEmpty()); |
| assertFalse(r1.isEmpty()); |
| assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); |
| } |
| assertNoLocks(tableName); |
| if (!exceptionDuringMutateRows.get()) { |
| fail("This cp should fail because the target lock is blocked by previous put"); |
| } |
| } |
| } |
| |
| /** |
| * A test case for issue HBASE-17482 |
| * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped |
| * onto cells in the append thread, a countdown latch is used to ensure that happened |
| * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) |
| * make the seqid/mvcc acquirement in handler thread and stamping in append thread |
| * No countdown latch to assure cells in memstore are stamped with seqid/mvcc. |
| * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner |
| * with a smaller readpoint can see these data, which disobey the multi version |
| * concurrency control rules. |
| * This test case is to reproduce this scenario. |
| * @throws IOException |
| */ |
| @Test |
| public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { |
| try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| //put two row first to init the scanner |
| Put put = new Put(Bytes.toBytes("0")); |
| put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); |
| table.put(put); |
| put = new Put(Bytes.toBytes("00")); |
| put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); |
| table.put(put); |
| Scan scan = new Scan(); |
| scan.setTimeRange(0, Long.MAX_VALUE); |
| scan.setCaching(1); |
| ResultScanner scanner = table.getScanner(scan); |
| int rowNum = scanner.next() != null ? 1 : 0; |
| //the started scanner shouldn't see the rows put below |
| for (int i = 1; i < 1000; i++) { |
| put = new Put(Bytes.toBytes(String.valueOf(i))); |
| put.setDurability(Durability.ASYNC_WAL); |
| put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); |
| table.put(put); |
| } |
| for (Result result : scanner) { |
| rowNum++; |
| } |
| //scanner should only see two rows |
| assertEquals(2, rowNum); |
| scanner = table.getScanner(scan); |
| rowNum = 0; |
| for (Result result : scanner) { |
| rowNum++; |
| } |
| // the new scanner should see all rows |
| assertEquals(1001, rowNum); |
| } |
| } |
| |
| @Test |
| public void testPutThenGetWithMultipleThreads() throws Exception { |
| final int THREAD_NUM = 20; |
| final int ROUND_NUM = 10; |
| for (int round = 0; round < ROUND_NUM; round++) { |
| ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); |
| final AtomicInteger successCnt = new AtomicInteger(0); |
| try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| for (int i = 0; i < THREAD_NUM; i++) { |
| final int index = i; |
| Thread t = new Thread(new Runnable() { |
| |
| @Override |
| public void run() { |
| final byte[] row = Bytes.toBytes("row-" + index); |
| final byte[] value = Bytes.toBytes("v" + index); |
| try { |
| Put put = new Put(row); |
| put.addColumn(FAMILY, QUALIFIER, value); |
| ht.put(put); |
| Get get = new Get(row); |
| Result result = ht.get(get); |
| byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); |
| if (Bytes.equals(value, returnedValue)) { |
| successCnt.getAndIncrement(); |
| } else { |
| LOG.error("Should be equal but not, original value: " + Bytes.toString(value) |
| + ", returned value: " |
| + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); |
| } |
| } catch (Throwable e) { |
| // do nothing |
| } |
| } |
| }); |
| threads.add(t); |
| } |
| for (Thread t : threads) { |
| t.start(); |
| } |
| for (Thread t : threads) { |
| t.join(); |
| } |
| assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); |
| } |
| TEST_UTIL.deleteTable(tableName); |
| } |
| } |
| |
| private static void assertNoLocks(final TableName tableName) |
| throws IOException, InterruptedException { |
| HRegion region = (HRegion) find(tableName); |
| assertEquals(0, region.getLockedRows().size()); |
| } |
| private static HRegion find(final TableName tableName) |
| throws IOException, InterruptedException { |
| HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); |
| List<HRegion> regions = rs.getRegions(tableName); |
| assertEquals(1, regions.size()); |
| return regions.get(0); |
| } |
| |
| private static <T extends RegionObserver> T find(final TableName tableName, |
| Class<T> clz) throws IOException, InterruptedException { |
| HRegion region = find(tableName); |
| Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); |
| assertTrue("The cp instance should be " + clz.getName() |
| + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); |
| return clz.cast(cp); |
| } |
| |
| public static class WaitingForMultiMutationsObserver |
| implements RegionCoprocessor, RegionObserver { |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, |
| final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| try { |
| latch.await(); |
| } catch (InterruptedException ex) { |
| throw new IOException(ex); |
| } |
| } |
| } |
| |
| public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { |
| private final CountDownLatch latch = new CountDownLatch(1); |
| |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, |
| final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| try { |
| // waiting for scanner |
| latch.await(); |
| } catch (InterruptedException ex) { |
| throw new IOException(ex); |
| } |
| } |
| |
| @Override |
| public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, |
| final Scan scan, final RegionScanner s) throws IOException { |
| latch.countDown(); |
| return s; |
| } |
| } |
| |
| static byte[] generateHugeValue(int size) { |
| Random rand = ThreadLocalRandom.current(); |
| byte[] value = new byte[size]; |
| for (int i = 0; i < value.length; i++) { |
| value[i] = (byte) rand.nextInt(256); |
| } |
| return value; |
| } |
| |
| @Test |
| public void testScanWithBatchSizeReturnIncompleteCells() throws IOException, InterruptedException { |
| TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) |
| .build(); |
| try (Table table = TEST_UTIL.createTable(hd, null)) { |
| TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); |
| table.put(put); |
| |
| put = new Put(ROW); |
| put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); |
| table.put(put); |
| |
| for (int i = 2; i < 5; i++) { |
| for (int version = 0; version < 2; version++) { |
| put = new Put(ROW); |
| put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); |
| table.put(put); |
| } |
| } |
| |
| Scan scan = new Scan(); |
| scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) |
| .setMaxResultSize(4 * 1024 * 1024); |
| Result result; |
| try (ResultScanner scanner = table.getScanner(scan)) { |
| List<Result> list = new ArrayList<>(); |
| /* |
| * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second |
| * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The |
| * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the |
| * moreResultsInRegion also would be false. Finally, the client should collect all the cells |
| * into two result: 2+3 -> 3+2; |
| */ |
| while ((result = scanner.next()) != null) { |
| list.add(result); |
| } |
| |
| Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); |
| Assert.assertEquals(2, list.size()); |
| Assert.assertEquals(3, list.get(0).size()); |
| Assert.assertEquals(2, list.get(1).size()); |
| } |
| |
| scan = new Scan(); |
| scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) |
| .setMaxResultSize(4 * 1024 * 1024); |
| try (ResultScanner scanner = table.getScanner(scan)) { |
| List<Result> list = new ArrayList<>(); |
| while ((result = scanner.next()) != null) { |
| list.add(result); |
| } |
| Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); |
| Assert.assertEquals(3, list.size()); |
| Assert.assertEquals(2, list.get(0).size()); |
| Assert.assertEquals(2, list.get(1).size()); |
| Assert.assertEquals(1, list.get(2).size()); |
| } |
| } |
| } |
| } |