| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.phoenix.end2end.index; |
| |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.end2end.ParallelStatsDisabledIT; |
| import org.apache.phoenix.hbase.index.IndexRegionObserver; |
| import org.apache.phoenix.hbase.index.Indexer; |
| import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; |
| import org.apache.phoenix.index.GlobalIndexChecker; |
| import org.apache.phoenix.index.PhoenixIndexBuilder; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.SQLException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| @RunWith(Parameterized.class) |
| public class IndexCoprocIT extends ParallelStatsDisabledIT { |
| private boolean isNamespaceMapped = false; |
| private boolean isMultiTenant = false; |
| public static final String GLOBAL_INDEX_CHECKER_CONFIG = |
| "|org.apache.phoenix.index.GlobalIndexChecker|805306365|"; |
| public static final String INDEX_REGION_OBSERVER_CONFIG = |
| "|org.apache.phoenix.hbase.index.IndexRegionObserver" + |
| "|805306366|org.apache.hadoop.hbase.index.codec.class=" + |
| "org.apache.phoenix.index.PhoenixIndexCodec," + |
| "index.builder=org.apache.phoenix.index.PhoenixIndexBuilder"; |
| public static final String INDEXER_CONFIG = |
| "|org.apache.phoenix.hbase.index.Indexer" + |
| "|805306366|org.apache.hadoop.hbase.index.codec.class=" + |
| "org.apache.phoenix.index.PhoenixIndexCodec," + |
| "index.builder=org.apache.phoenix.index.PhoenixIndexBuilder"; |
| |
| public IndexCoprocIT(boolean isMultiTenant){ |
| this.isMultiTenant = isMultiTenant; |
| } |
| @Parameterized.Parameters(name ="CreateIndexCoprocIT_mulitTenant={0}") |
| public static Collection<Object[]> data() { |
| return Arrays.asList(new Object[][]{{true}, {false}}); |
| } |
| |
| @Test |
| public void testCreateCoprocs() throws Exception { |
| String schemaName = "S" + generateUniqueName(); |
| String tableName = "T_" + generateUniqueName(); |
| String indexName = "I_" + generateUniqueName(); |
| String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, |
| isNamespaceMapped).getString(); |
| String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName, |
| indexName, isNamespaceMapped).getString(); |
| Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin(); |
| |
| createBaseTable(schemaName, tableName, isMultiTenant, 0, null); |
| createIndexTable(schemaName, tableName, indexName); |
| |
| HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); |
| |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); |
| |
| removeCoproc(IndexRegionObserver.class, baseDescriptor, admin); |
| removeCoproc(IndexRegionObserver.class, indexDescriptor, admin); |
| removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin); |
| |
| Map<String, String> props = new HashMap<String, String>(); |
| props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); |
| Indexer.enableIndexing(baseDescriptor, PhoenixIndexBuilder.class, |
| props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); |
| admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor); |
| baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); |
| indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); |
| assertUsingOldCoprocs(baseDescriptor, indexDescriptor); |
| |
| createBaseTable(schemaName, tableName, true, 0, null); |
| baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); |
| indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); |
| assertUsingOldCoprocs(baseDescriptor, indexDescriptor); |
| } |
| |
| @Test |
| public void testCreateOnExistingHBaseTable() throws Exception { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| byte[] cf = Bytes.toBytes("f"); |
| try (PhoenixConnection conn = getConnection()){ |
| TableName table = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName, |
| tableName, isNamespaceMapped).getString()); |
| HTableDescriptor originalDesc = new HTableDescriptor(table); |
| originalDesc.addFamily(new HColumnDescriptor(cf)); |
| Admin admin = conn.getQueryServices().getAdmin(); |
| admin.createTable(originalDesc); |
| createBaseTable(schemaName, tableName, isMultiTenant, 0, null); |
| HTableDescriptor baseDescriptor = admin.getTableDescriptor(table); |
| assertUsingNewCoprocs(baseDescriptor); |
| createIndexTable(schemaName, tableName, indexName); |
| baseDescriptor = admin.getTableDescriptor(table); |
| TableName indexTable = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName, |
| indexName, isNamespaceMapped).getString()); |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(indexTable); |
| assertUsingNewCoprocs(baseDescriptor, indexDescriptor); |
| } |
| } |
| |
| @Test |
| public void testAlterDoesntChangeCoprocs() throws Exception { |
| String schemaName = "S" + generateUniqueName(); |
| String tableName = "T_" + generateUniqueName(); |
| String indexName = "I_" + generateUniqueName(); |
| String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, |
| isNamespaceMapped).getString(); |
| String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName, |
| indexName, isNamespaceMapped).getString(); |
| Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin(); |
| |
| createBaseTable(schemaName, tableName, isMultiTenant, 0, null); |
| createIndexTable(schemaName, tableName, indexName); |
| HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); |
| |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); |
| String columnName = "foo"; |
| addColumnToBaseTable(schemaName, tableName, columnName); |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); |
| dropColumnToBaseTable(schemaName, tableName, columnName); |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); |
| } |
| private void assertUsingOldCoprocs(HTableDescriptor baseDescriptor, |
| HTableDescriptor indexDescriptor) { |
| assertCoprocsContains(Indexer.class, baseDescriptor); |
| assertCoprocConfig(baseDescriptor, Indexer.class.getName(), |
| INDEXER_CONFIG); |
| assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor); |
| assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor); |
| } |
| |
| private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor) { |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocsNotContains(Indexer.class, baseDescriptor); |
| } |
| |
| private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor, |
| HTableDescriptor indexDescriptor) { |
| assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); |
| assertCoprocConfig(baseDescriptor, IndexRegionObserver.class.getName(), |
| INDEX_REGION_OBSERVER_CONFIG); |
| assertCoprocsNotContains(Indexer.class, baseDescriptor); |
| assertCoprocsNotContains(Indexer.class, indexDescriptor); |
| assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); |
| assertCoprocConfig(indexDescriptor, GlobalIndexChecker.class.getName(), |
| GLOBAL_INDEX_CHECKER_CONFIG); |
| } |
| |
| private void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) { |
| String expectedCoprocName = clazz.getName(); |
| boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); |
| Assert.assertTrue("Could not find coproc " + expectedCoprocName + |
| " in descriptor " + descriptor,foundCoproc); |
| } |
| |
| private void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) { |
| String expectedCoprocName = clazz.getName(); |
| boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); |
| Assert.assertFalse("Could find coproc " + expectedCoprocName + |
| " in descriptor " + descriptor,foundCoproc); |
| } |
| |
| private boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) { |
| boolean foundCoproc = false; |
| for (String coprocName : descriptor.getCoprocessors()){ |
| if (coprocName.equals(expectedCoprocName)){ |
| foundCoproc = true; |
| break; |
| } |
| } |
| return foundCoproc; |
| } |
| |
| public static void assertCoprocConfig(HTableDescriptor indexDesc, |
| String className, String expectedConfigValue){ |
| boolean foundConfig = false; |
| for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry : |
| indexDesc.getValues().entrySet()){ |
| String propKey = Bytes.toString(entry.getKey().get()); |
| String propValue = Bytes.toString(entry.getValue().get()); |
| //Unfortunately, a good API to read coproc properties didn't show up until |
| //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x |
| if (propKey.contains("coprocessor")){ |
| if (propValue.contains(className)){ |
| Assert.assertEquals(className + " is configured incorrectly", |
| expectedConfigValue, |
| propValue); |
| foundConfig = true; |
| break; |
| } |
| } |
| } |
| Assert.assertTrue("Couldn't find config for " + className, foundConfig); |
| } |
| |
| private void removeCoproc(Class clazz, HTableDescriptor descriptor, Admin admin) throws Exception { |
| descriptor.removeCoprocessor(clazz.getName()); |
| admin.modifyTable(descriptor.getTableName(), descriptor); |
| } |
| |
| private void createIndexTable(String schemaName, String tableName, String indexName) |
| throws SQLException { |
| Connection conn = getConnection(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)"); |
| } |
| |
| private void addColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{ |
| Connection conn = getConnection(); |
| String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " + |
| " ADD " + columnName + " varchar(512)"; |
| conn.createStatement().execute(ddl); |
| } |
| |
| private void dropColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{ |
| Connection conn = getConnection(); |
| String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " + |
| " DROP COLUMN " + columnName; |
| conn.createStatement().execute(ddl); |
| } |
| |
| private void createBaseTable(String schemaName, String tableName, boolean multiTenant, Integer saltBuckets, String splits) |
| throws SQLException { |
| Connection conn = getConnection(); |
| if (isNamespaceMapped) { |
| conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); |
| } |
| String ddl = "CREATE TABLE IF NOT EXISTS " |
| + SchemaUtil.getTableName(schemaName, tableName) + " (t_id VARCHAR NOT NULL,\n" + |
| "k1 VARCHAR NOT NULL,\n" + |
| "k2 INTEGER NOT NULL,\n" + |
| "v1 VARCHAR,\n" + |
| "v2 INTEGER,\n" + |
| "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"; |
| String ddlOptions = multiTenant ? "MULTI_TENANT=true" : ""; |
| if (saltBuckets != null) { |
| ddlOptions = ddlOptions |
| + (ddlOptions.isEmpty() ? "" : ",") |
| + "salt_buckets=" + saltBuckets; |
| } |
| if (splits != null) { |
| ddlOptions = ddlOptions |
| + (ddlOptions.isEmpty() ? "" : ",") |
| + "splits=" + splits; |
| } |
| conn.createStatement().execute(ddl + ddlOptions); |
| conn.close(); |
| } |
| |
| private PhoenixConnection getConnection() throws SQLException{ |
| Properties props = new Properties(); |
| props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped)); |
| return (PhoenixConnection) DriverManager.getConnection(getUrl(),props); |
| } |
| |
| } |