blob: a4cff74ab729341a62dfde870568375f7c877190 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
@Category({CoprocessorTests.class, MediumTests.class})
public class TestCoprocessorInterface {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorInterface.class);
@Rule public TestName name = new TestName();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static final Path DIR = TEST_UTIL.getDataTestDir();
private static class CustomScanner implements RegionScanner {
private RegionScanner delegate;
public CustomScanner(RegionScanner delegate) {
this.delegate = delegate;
}
@Override
public boolean next(List<Cell> results) throws IOException {
return delegate.next(results);
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
return delegate.next(result, scannerContext);
}
@Override
public boolean nextRaw(List<Cell> result)
throws IOException {
return delegate.nextRaw(result);
}
@Override
public boolean nextRaw(List<Cell> result, ScannerContext context)
throws IOException {
return delegate.nextRaw(result, context);
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public RegionInfo getRegionInfo() {
return delegate.getRegionInfo();
}
@Override
public boolean isFilterDone() throws IOException {
return delegate.isFilterDone();
}
@Override
public boolean reseek(byte[] row) throws IOException {
return false;
}
@Override
public long getMaxResultSize() {
return delegate.getMaxResultSize();
}
@Override
public long getMvccReadPoint() {
return delegate.getMvccReadPoint();
}
@Override
public int getBatch() {
return delegate.getBatch();
}
}
public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
private boolean startCalled;
private boolean stopCalled;
private boolean preOpenCalled;
private boolean postOpenCalled;
private boolean preCloseCalled;
private boolean postCloseCalled;
private boolean preCompactCalled;
private boolean postCompactCalled;
private boolean preFlushCalled;
private boolean postFlushCalled;
private ConcurrentMap<String, Object> sharedData;
@Override
public void start(CoprocessorEnvironment e) {
sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
// using new String here, so that there will be new object on each invocation
sharedData.putIfAbsent("test1", new Object());
startCalled = true;
}
@Override
public void stop(CoprocessorEnvironment e) {
sharedData = null;
stopCalled = true;
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
preOpenCalled = true;
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
postOpenCalled = true;
}
@Override
public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
preCloseCalled = true;
}
@Override
public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
postCloseCalled = true;
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
preCompactCalled = true;
return scanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
postCompactCalled = true;
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
preFlushCalled = true;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
postFlushCalled = true;
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
return new CustomScanner(s);
}
boolean wasStarted() {
return startCalled;
}
boolean wasStopped() {
return stopCalled;
}
boolean wasOpened() {
return (preOpenCalled && postOpenCalled);
}
boolean wasClosed() {
return (preCloseCalled && postCloseCalled);
}
boolean wasFlushed() {
return (preFlushCalled && postFlushCalled);
}
boolean wasCompacted() {
return (preCompactCalled && postCompactCalled);
}
Map<String, Object> getSharedData() {
return sharedData;
}
}
public static class CoprocessorII implements RegionCoprocessor {
private ConcurrentMap<String, Object> sharedData;
@Override
public void start(CoprocessorEnvironment e) {
sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
sharedData.putIfAbsent("test2", new Object());
}
@Override
public void stop(CoprocessorEnvironment e) {
sharedData = null;
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(new RegionObserver() {
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
throw new RuntimeException();
}
});
}
Map<String, Object> getSharedData() {
return sharedData;
}
}
@Test
public void testSharedData() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
byte [][] families = { fam1, fam2, fam3 };
Configuration hc = initConfig();
HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[]{}, families);
for (int i = 0; i < 3; i++) {
HTestConst.addContent(region, fam3);
region.flush(true);
}
region.compact(false);
region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
assertNotNull(o);
assertNotNull(o2);
// to coprocessors get different sharedDatas
assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
// make sure that all coprocessor of a class have identical sharedDatas
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
// now have all Environments fail
try {
byte [] r = region.getRegionInfo().getStartKey();
if (r == null || r.length <= 0) {
// Its the start row. Can't ask for null. Ask for minimal key instead.
r = new byte [] {0};
}
Get g = new Get(r);
region.get(g);
fail();
} catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
}
assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
c = c2 = null;
// perform a GC
System.gc();
// reopen the region
region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
// CPimpl is unaffected, still the same reference
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
// new map and object created, hence the reference is different
// hence the old entry was indeed removed by the GC and new one has been created
Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
assertFalse(o3 == o2);
HBaseTestingUtility.closeRegionAndWAL(region);
}
@Test
public void testCoprocessorInterface() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
byte [][] families = { fam1, fam2, fam3 };
Configuration hc = initConfig();
HRegion region = initHRegion(tableName, name.getMethodName(), hc,
new Class<?>[]{CoprocessorImpl.class}, families);
for (int i = 0; i < 3; i++) {
HTestConst.addContent(region, fam3);
region.flush(true);
}
region.compact(false);
// HBASE-4197
Scan s = new Scan();
RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
assertTrue(scanner instanceof CustomScanner);
// this would throw an exception before HBASE-4197
scanner.next(new ArrayList<>());
HBaseTestingUtility.closeRegionAndWAL(region);
Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasFlushed());
assertTrue(((CoprocessorImpl)c).wasCompacted());
}
HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
throws IOException {
//RegionInfo info = new RegionInfo(tableName, null, null, false);
HRegion r = HRegion.openHRegion(closedRegion, null);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
Configuration conf = TEST_UTIL.getConfiguration();
RegionCoprocessorHost host = new RegionCoprocessorHost(r,
Mockito.mock(RegionServerServices.class), conf);
r.setCoprocessorHost(host);
for (Class<?> implClass : implClasses) {
host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
}
// we need to manually call pre- and postOpen here since the
// above load() is not the real case for CP loading. A CP is
// expected to be loaded by default from 1) configuration; or 2)
// HTableDescriptor. If it's loaded after HRegion initialized,
// the pre- and postOpen() won't be triggered automatically.
// Here we have to call pre and postOpen explicitly.
host.preOpen();
host.postOpen();
return r;
}
HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
Class<?>[] implClasses, byte[][] families) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] family : families) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
}
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(null)
.setEndKey(null)
.setSplit(false)
.build();
Path path = new Path(DIR + callingMethod);
HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, builder.build());
// this following piece is a hack.
RegionCoprocessorHost host =
new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
r.setCoprocessorHost(host);
for (Class<?> implClass : implClasses) {
host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
Coprocessor c = host.findCoprocessor(implClass.getName());
assertNotNull(c);
}
// Here we have to call pre and postOpen explicitly.
host.preOpen();
host.postOpen();
return r;
}
private Configuration initConfig() {
// Always compact if there is more than one store file.
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
// Increase the amount of time between client retries
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
1024 * 128);
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
return TEST_UTIL.getConfiguration();
}
}