| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.hbase.index.covered; |
| |
| import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.Coprocessor; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; |
| import org.apache.phoenix.hbase.index.IndexTestingUtils; |
| import org.apache.phoenix.hbase.index.Indexer; |
| import org.apache.phoenix.hbase.index.TableName; |
| import org.apache.phoenix.hbase.index.covered.update.ColumnReference; |
| import org.apache.phoenix.hbase.index.scanner.Scanner; |
| import org.apache.phoenix.util.EnvironmentEdge; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| /** |
| * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple |
| * {@link IndexCodec} and BatchCache implementation. |
| */ |
| @Category(NeedsOwnMiniClusterTest.class) |
| public class EndToEndCoveredColumnsIndexBuilderIT { |
| |
| public class TestState { |
| |
| private HTable table; |
| private long ts; |
| private VerifyingIndexCodec codec; |
| |
| /** |
| * @param primary |
| * @param codec |
| * @param ts |
| */ |
| public TestState(HTable primary, VerifyingIndexCodec codec, long ts) { |
| this.table = primary; |
| this.ts = ts; |
| this.codec = codec; |
| } |
| |
| } |
| |
| private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); |
| |
| private static final byte[] row = Bytes.toBytes("row"); |
| private static final byte[] family = Bytes.toBytes("FAM"); |
| private static final byte[] qual = Bytes.toBytes("qual"); |
| private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family); |
| |
| @Rule |
| public TableName TestTable = new TableName(); |
| |
| private TestState state; |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| Configuration conf = UTIL.getConfiguration(); |
| setUpConfigForMiniCluster(conf); |
| IndexTestingUtils.setupConfig(conf); |
| // disable version checking, so we can test against whatever version of HBase happens to be |
| // installed (right now, its generally going to be SNAPSHOT versions). |
| conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); |
| UTIL.startMiniCluster(); |
| } |
| |
| @Before |
| public void setup() throws Exception { |
| this.state = setupTest(TestTable.getTableNameString()); |
| } |
| |
| private interface TableStateVerifier { |
| |
| /** |
| * Verify that the state of the table is correct. Should fail the unit test if it isn't as |
| * expected. |
| * @param state |
| */ |
| public void verify(TableState state); |
| |
| } |
| |
| /** |
| * {@link TableStateVerifier} that ensures the kvs returned from the table match the passed |
| * {@link KeyValue}s when querying on the given columns. |
| */ |
| private class ListMatchingVerifier implements TableStateVerifier { |
| |
| private List<Cell> expectedKvs; |
| private ColumnReference[] columns; |
| private String msg; |
| |
| public ListMatchingVerifier(String msg, List<Cell> kvs, ColumnReference... columns) { |
| this.expectedKvs = kvs; |
| this.columns = columns; |
| this.msg = msg; |
| } |
| |
| @Override |
| public void verify(TableState state) { |
| IndexMetaData indexMetaData = new IndexMetaData() { |
| |
| @Override |
| public boolean isImmutableRows() { |
| return false; |
| } |
| |
| @Override |
| public boolean ignoreNewerMutations() { |
| return false; |
| } |
| |
| }; |
| try { |
| Scanner kvs = |
| ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false, false, indexMetaData).getFirst(); |
| |
| int count = 0; |
| Cell kv; |
| while ((kv = kvs.next()) != null) { |
| Cell next = expectedKvs.get(count++); |
| assertEquals( |
| msg + ": Unexpected kv in table state!\nexpected v1: " |
| + Bytes.toString(next.getValue()) + "\nactual v1:" + Bytes.toString(kv.getValue()), |
| next, kv); |
| } |
| |
| assertEquals(msg + ": Didn't find enough kvs in table state!", expectedKvs.size(), count); |
| } catch (IOException e) { |
| fail(msg + ": Got an exception while reading local table state! " + e.getMessage()); |
| } |
| } |
| } |
| |
| private class VerifyingIndexCodec extends CoveredIndexCodecForTesting { |
| |
| private Queue<TableStateVerifier> verifiers = new ArrayDeque<TableStateVerifier>(); |
| |
| @Override |
| public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { |
| verify(state); |
| return super.getIndexDeletes(state, context); |
| } |
| |
| @Override |
| public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { |
| verify(state); |
| return super.getIndexUpserts(state, context); |
| } |
| |
| private void verify(TableState state) { |
| TableStateVerifier verifier = verifiers.poll(); |
| if (verifier == null) return; |
| verifier.verify(state); |
| } |
| } |
| |
| /** |
| * Test that we see the expected values in a {@link TableState} when doing single puts against a |
| * region. |
| * @throws Exception on failure |
| */ |
| @Test |
| public void testExpectedResultsInTableStateForSinglePut() throws Exception { |
| //just do a simple Put to start with |
| long ts = state.ts; |
| Put p = new Put(row, ts); |
| p.add(family, qual, Bytes.toBytes("v1")); |
| |
| // get all the underlying kvs for the put |
| final List<Cell> expectedKvs = new ArrayList<Cell>(); |
| final List<Cell> allKvs = new ArrayList<Cell>(); |
| allKvs.addAll(p.getFamilyMap().get(family)); |
| |
| // setup the verifier for the data we expect to write |
| // first call shouldn't have anything in the table |
| final ColumnReference familyRef = |
| new ColumnReference(EndToEndCoveredColumnsIndexBuilderIT.family, ColumnReference.ALL_QUALIFIERS); |
| |
| VerifyingIndexCodec codec = state.codec; |
| codec.verifiers.add(new ListMatchingVerifier("cleanup state 1", expectedKvs, familyRef)); |
| codec.verifiers.add(new ListMatchingVerifier("put state 1", allKvs, familyRef)); |
| |
| // do the actual put (no indexing will actually be done) |
| HTable primary = state.table; |
| primary.put(p); |
| primary.flushCommits(); |
| |
| // now we do another put to the same row. We should see just the old row state, followed by the |
| // new + old |
| p = new Put(row, ts + 1); |
| p.add(family, qual, Bytes.toBytes("v2")); |
| expectedKvs.addAll(allKvs); |
| // add them first b/c the ts is newer |
| allKvs.addAll(0, p.get(family, qual)); |
| codec.verifiers.add(new ListMatchingVerifier("cleanup state 2", expectedKvs, familyRef)); |
| codec.verifiers.add(new ListMatchingVerifier("put state 2", allKvs, familyRef)); |
| |
| // do the actual put |
| primary.put(p); |
| primary.flushCommits(); |
| |
| // cleanup after ourselves |
| cleanup(state); |
| } |
| |
| /** |
| * Similar to {@link #testExpectedResultsInTableStateForSinglePut()}, but against batches of puts. |
| * Previous implementations managed batches by playing current state against each element in the |
| * batch, rather than combining all the per-row updates into a single mutation for the batch. This |
| * test ensures that we see the correct expected state. |
| * @throws Exception on failure |
| */ |
| @Test |
| public void testExpectedResultsInTableStateForBatchPuts() throws Exception { |
| long ts = state.ts; |
| // build up a list of puts to make, all on the same row |
| Put p1 = new Put(row, ts); |
| p1.add(family, qual, Bytes.toBytes("v1")); |
| Put p2 = new Put(row, ts + 1); |
| p2.add(family, qual, Bytes.toBytes("v2")); |
| |
| // setup all the verifiers we need. This is just the same as above, but will be called twice |
| // since we need to iterate the batch. |
| |
| // get all the underlying kvs for the put |
| final List<Cell> allKvs = new ArrayList<Cell>(2); |
| allKvs.addAll(p2.getFamilyCellMap().get(family)); |
| allKvs.addAll(p1.getFamilyCellMap().get(family)); |
| |
| // setup the verifier for the data we expect to write |
| // both puts should be put into a single batch |
| final ColumnReference familyRef = |
| new ColumnReference(EndToEndCoveredColumnsIndexBuilderIT.family, ColumnReference.ALL_QUALIFIERS); |
| VerifyingIndexCodec codec = state.codec; |
| // no previous state in the table |
| codec.verifiers.add(new ListMatchingVerifier("cleanup state 1", Collections |
| .<Cell> emptyList(), familyRef)); |
| codec.verifiers.add(new ListMatchingVerifier("put state 1", p1.getFamilyCellMap().get(family), |
| familyRef)); |
| |
| codec.verifiers.add(new ListMatchingVerifier("cleanup state 2", p1.getFamilyCellMap().get(family), |
| familyRef)); |
| // kvs from both puts should be in the table now |
| codec.verifiers.add(new ListMatchingVerifier("put state 2", allKvs, familyRef)); |
| |
| // do the actual put (no indexing will actually be done) |
| HTable primary = state.table; |
| primary.setAutoFlush(false); |
| primary.put(Arrays.asList(p1, p2)); |
| primary.flushCommits(); |
| |
| // cleanup after ourselves |
| cleanup(state); |
| } |
| |
| /** |
| * @param tableName name of the table to create for the test |
| * @return the supporting state for the test |
| */ |
| private TestState setupTest(String tableName) throws IOException { |
| byte[] tableNameBytes = Bytes.toBytes(tableName); |
| @SuppressWarnings("deprecation") |
| HTableDescriptor desc = new HTableDescriptor(tableNameBytes); |
| desc.addFamily(FAM1); |
| // add the necessary simple options to create the builder |
| Map<String, String> indexerOpts = new HashMap<String, String>(); |
| // just need to set the codec - we are going to set it later, but we need something here or the |
| // initializer blows up. |
| indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, |
| CoveredIndexCodecForTesting.class.getName()); |
| Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); |
| |
| // create the table |
| HBaseAdmin admin = UTIL.getHBaseAdmin(); |
| admin.createTable(desc); |
| HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes); |
| |
| // overwrite the codec so we can verify the current state |
| Region region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); |
| Indexer indexer = |
| (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName()); |
| NonTxIndexBuilder builder = |
| (NonTxIndexBuilder) indexer.getBuilderForTesting(); |
| VerifyingIndexCodec codec = new VerifyingIndexCodec(); |
| builder.setIndexCodecForTesting(codec); |
| |
| // setup the Puts we want to write |
| final long ts = System.currentTimeMillis(); |
| EnvironmentEdge edge = new EnvironmentEdge() { |
| |
| @Override |
| public long currentTime() { |
| return ts; |
| } |
| }; |
| EnvironmentEdgeManager.injectEdge(edge); |
| |
| return new TestState(primary, codec, ts); |
| } |
| |
| /** |
| * Cleanup the test based on the passed state. |
| * @param state |
| */ |
| private void cleanup(TestState state) throws IOException { |
| EnvironmentEdgeManager.reset(); |
| state.table.close(); |
| UTIL.deleteTable(state.table.getTableName()); |
| } |
| } |