| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end.index; |
| |
| import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import jline.internal.Log; |
| |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.MetaTableAccessor; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HStore; |
| import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; |
| import org.apache.phoenix.end2end.ParallelStatsDisabledIT; |
| import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.query.BaseTest; |
| import org.apache.phoenix.query.ConnectionQueryServices; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PTableKey; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.IndexScrutiny; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.TestUtil; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| |
| import com.google.common.primitives.Doubles; |
| |
| @RunWith(Parameterized.class) |
| public class MutableIndexIT extends ParallelStatsDisabledIT { |
| |
| protected final boolean localIndex; |
| private final String tableDDLOptions; |
| |
| public MutableIndexIT(Boolean localIndex, String txProvider, Boolean columnEncoded) { |
| this.localIndex = localIndex; |
| StringBuilder optionBuilder = new StringBuilder(); |
| if (txProvider != null) { |
| optionBuilder.append("TRANSACTIONAL=true," + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + "='" + txProvider + "'"); |
| } |
| if (!columnEncoded) { |
| if (optionBuilder.length()!=0) |
| optionBuilder.append(","); |
| optionBuilder.append("COLUMN_ENCODED_BYTES=0"); |
| } |
| this.tableDDLOptions = optionBuilder.toString(); |
| } |
| |
| private static Connection getConnection(Properties props) throws SQLException { |
| props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(1)); |
| Connection conn = DriverManager.getConnection(getUrl(), props); |
| return conn; |
| } |
| |
| private static Connection getConnection() throws SQLException { |
| Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); |
| return getConnection(props); |
| } |
| |
| @Parameters(name="MutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}") // name is used by failsafe as file name in reports |
| public static Collection<Object[]> data() { |
| return Arrays.asList(new Object[][] { |
| { false, null, false }, { false, null, true }, |
| { false, "TEPHRA", false }, { false, "TEPHRA", true } // , |
| //{ false, "OMID", false }, { false, "OMID", true }, |
| // { true, null, false }, { true, null, true }, |
| // { true, "TEPHRA", false }, { true, "TEPHRA", true }, |
| //{ true, "OMID", false }, { true, "OMID", true }, |
| }); |
| } |
| |
| @Test |
| public void testCoveredColumnUpdates() throws Exception { |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| |
| TestUtil.createMultiCFTestTable(conn, fullTableName, tableDDLOptions); |
| populateMultiCFTestTable(fullTableName); |
| conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName |
| + " (char_col1 ASC, int_col1 ASC) INCLUDE (long_col1, long_col2)"); |
| |
| String query = "SELECT char_col1, int_col1, long_col2 from " + fullTableName; |
| ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if (localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName +" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); |
| } |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(2, rs.getInt(2)); |
| assertEquals(3L, rs.getLong(3)); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(3, rs.getInt(2)); |
| assertEquals(4L, rs.getLong(3)); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(4, rs.getInt(2)); |
| assertEquals(5L, rs.getLong(3)); |
| assertFalse(rs.next()); |
| |
| PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName |
| + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM " |
| + fullTableName + " WHERE long_col2=?"); |
| stmt.setLong(1,4L); |
| assertEquals(1,stmt.executeUpdate()); |
| conn.commit(); |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(2, rs.getInt(2)); |
| assertEquals(3L, rs.getLong(3)); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(3, rs.getInt(2)); |
| assertEquals(8L, rs.getLong(3)); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(4, rs.getInt(2)); |
| assertEquals(5L, rs.getLong(3)); |
| assertFalse(rs.next()); |
| |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName |
| + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, CAST(null AS BIGINT) FROM " |
| + fullTableName + " WHERE long_col2=?"); |
| stmt.setLong(1,3L); |
| assertEquals(1,stmt.executeUpdate()); |
| conn.commit(); |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(2, rs.getInt(2)); |
| assertEquals(0, rs.getLong(3)); |
| assertTrue(rs.wasNull()); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(3, rs.getInt(2)); |
| assertEquals(8L, rs.getLong(3)); |
| assertTrue(rs.next()); |
| assertEquals("chara", rs.getString(1)); |
| assertEquals(4, rs.getInt(2)); |
| assertEquals(5L, rs.getLong(3)); |
| assertFalse(rs.next()); |
| if(localIndex) { |
| query = "SELECT b.* from " + fullTableName + " where int_col1 = 4"; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName +" [1]\n" + |
| " SERVER FILTER BY TO_INTEGER(\"INT_COL1\") = 4\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("varchar_b", rs.getString(1)); |
| assertEquals("charb", rs.getString(2)); |
| assertEquals(5, rs.getInt(3)); |
| assertEquals(5, rs.getLong(4)); |
| assertFalse(rs.next()); |
| |
| } |
| } |
| } |
| |
| @Test |
| public void testCoveredColumns() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| try (Connection conn = getConnection()) { |
| |
| conn.setAutoCommit(false); |
| String query; |
| ResultSet rs; |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2, "x"); |
| stmt.setString(3, "1"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("x",rs.getString(1)); |
| assertEquals("a",rs.getString(2)); |
| assertEquals("1",rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2, null); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("x",rs.getString(1)); |
| assertEquals("a",rs.getString(2)); |
| assertNull(rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if(localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); |
| } |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("a",rs.getString(1)); |
| assertEquals("x",rs.getString(2)); |
| assertNull(rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2,"3"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if(localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName + " [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); |
| } |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("a",rs.getString(1)); |
| assertEquals("x",rs.getString(2)); |
| assertEquals("3",rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2,"4"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if(localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs)); |
| } |
| |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("a",rs.getString(1)); |
| assertEquals("x",rs.getString(2)); |
| assertEquals("4",rs.getString(3)); |
| assertFalse(rs.next()); |
| } |
| } |
| |
| @Test |
| public void testCompoundIndexKey() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| String query; |
| ResultSet rs; |
| // make sure that the tables are empty, but reachable |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| // load some data into the table |
| PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2, "x"); |
| stmt.setString(3, "1"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("x",rs.getString(1)); |
| assertEquals("1",rs.getString(2)); |
| assertEquals("a",rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); |
| stmt.setString(1,"a"); |
| stmt.setString(2, "y"); |
| stmt.setString(3, null); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("y",rs.getString(1)); |
| assertNull(rs.getString(2)); |
| assertEquals("a",rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if (localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\n" |
| + " SERVER FILTER BY FIRST KEY ONLY\n" |
| + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n" |
| + " SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs)); |
| } |
| //make sure the data table looks like what we expect |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("a",rs.getString(1)); |
| assertEquals("y",rs.getString(2)); |
| assertNull(rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| // Upsert new row with null leading index column |
| stmt.setString(1,"b"); |
| stmt.setString(2, null); |
| stmt.setString(3, "3"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals(null,rs.getString(1)); |
| assertEquals("3",rs.getString(2)); |
| assertEquals("b",rs.getString(3)); |
| assertTrue(rs.next()); |
| assertEquals("y",rs.getString(1)); |
| assertNull(rs.getString(2)); |
| assertEquals("a",rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| // Update row with null leading index column to have a value |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?)"); |
| stmt.setString(1,"b"); |
| stmt.setString(2, "z"); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("y",rs.getString(1)); |
| assertNull(rs.getString(2)); |
| assertEquals("a",rs.getString(3)); |
| assertTrue(rs.next()); |
| assertEquals("z",rs.getString(1)); |
| assertEquals("3",rs.getString(2)); |
| assertEquals("b",rs.getString(3)); |
| assertFalse(rs.next()); |
| } |
| |
| } |
| |
| /** |
| * There was a case where if there were multiple updates to a single row in the same batch, the |
| * index wouldn't be updated correctly as each element of the batch was evaluated with the state |
| * previous to the batch, rather than with the rest of the batch. This meant you could do a put |
| * and a delete on a row in the same batch and the index result would contain the current + put |
| * and current + delete, but not current + put + delete. |
| * @throws Exception on failure |
| */ |
| @Test |
| public void testMultipleUpdatesToSingleRow() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| String query; |
| ResultSet rs; |
| // make sure that the tables are empty, but reachable |
| conn.createStatement().execute( |
| "CREATE TABLE " + fullTableName |
| + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| // load some data into the table |
| PreparedStatement stmt = |
| conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); |
| stmt.setString(1, "a"); |
| stmt.setString(2, "x"); |
| stmt.setString(3, "1"); |
| stmt.execute(); |
| conn.commit(); |
| |
| // make sure the index is working as expected |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("x", rs.getString(1)); |
| assertEquals("1", rs.getString(2)); |
| assertEquals("a", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| // do multiple updates to the same row, in the same batch |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)"); |
| stmt.setString(1, "a"); |
| stmt.setString(2, "y"); |
| stmt.execute(); |
| stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)"); |
| stmt.setString(1, "a"); |
| stmt.setString(2, null); |
| stmt.execute(); |
| conn.commit(); |
| |
| query = "SELECT * FROM " + fullIndexName; |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("y", rs.getString(1)); |
| assertNull(rs.getString(2)); |
| assertEquals("a", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery("EXPLAIN " + query); |
| if(localIndex) { |
| assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\n" |
| + " SERVER FILTER BY FIRST KEY ONLY\n" |
| + "CLIENT MERGE SORT", |
| QueryUtil.getExplainPlan(rs)); |
| } else { |
| assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n" |
| + " SERVER FILTER BY FIRST KEY ONLY", |
| QueryUtil.getExplainPlan(rs)); |
| } |
| |
| // check that the data table matches as expected |
| rs = conn.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals("a", rs.getString(1)); |
| assertEquals("y", rs.getString(2)); |
| assertNull(rs.getString(3)); |
| assertFalse(rs.next()); |
| } |
| } |
| |
| @Test |
| public void testUpsertingNullForIndexedColumns() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| String testTableName = tableName + "_" + System.currentTimeMillis(); |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| ResultSet rs; |
| Statement stmt = conn.createStatement(); |
| stmt.execute("CREATE TABLE " + testTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions); |
| stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + testTableName + " (v2) INCLUDE(v3)"); |
| |
| //create a row with value null for indexed column v2 |
| stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')"); |
| conn.commit(); |
| |
| //assert values in index table |
| rs = stmt.executeQuery("select * from " + fullIndexName); |
| assertTrue(rs.next()); |
| assertEquals(0, Doubles.compare(0, rs.getDouble(1))); |
| assertTrue(rs.wasNull()); |
| assertEquals("cc1", rs.getString(2)); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| //assert values in data table |
| rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName); |
| assertTrue(rs.next()); |
| assertEquals("cc1", rs.getString(1)); |
| assertEquals(0, Doubles.compare(0, rs.getDouble(2))); |
| assertTrue(rs.wasNull()); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| //update the previously null value for indexed column v2 to a non-null value 1.23 |
| stmt.executeUpdate("upsert into " + testTableName + " values('cc1', 1.23, 'abc')"); |
| conn.commit(); |
| |
| //assert values in data table |
| rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + testTableName); |
| assertTrue(rs.next()); |
| assertEquals("cc1", rs.getString(1)); |
| assertEquals(0, Doubles.compare(1.23, rs.getDouble(2))); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| //assert values in index table |
| rs = stmt.executeQuery("select * from " + indexName); |
| assertTrue(rs.next()); |
| assertEquals(0, Doubles.compare(1.23, rs.getDouble(1))); |
| assertEquals("cc1", rs.getString(2)); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| //update the value for indexed column v2 back to null |
| stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')"); |
| conn.commit(); |
| |
| //assert values in index table |
| rs = stmt.executeQuery("select * from " + indexName); |
| assertTrue(rs.next()); |
| assertEquals(0, Doubles.compare(0, rs.getDouble(1))); |
| assertTrue(rs.wasNull()); |
| assertEquals("cc1", rs.getString(2)); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| |
| //assert values in data table |
| rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName); |
| assertTrue(rs.next()); |
| assertEquals("cc1", rs.getString(1)); |
| assertEquals(0, Doubles.compare(0, rs.getDouble(2))); |
| assertEquals("abc", rs.getString(3)); |
| assertFalse(rs.next()); |
| } |
| } |
| |
| |
| private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException { |
| PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); |
| assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows()); |
| } |
| |
| @Test |
| public void testAlterTableWithImmutability() throws Exception { |
| String query; |
| ResultSet rs; |
| String tableName = "TBL_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| conn.createStatement().execute( |
| "CREATE TABLE " + fullTableName +" (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " + tableDDLOptions); |
| |
| query = "SELECT * FROM " + fullTableName; |
| rs = conn.createStatement().executeQuery(query); |
| assertFalse(rs.next()); |
| |
| assertImmutableRows(conn,fullTableName, false); |
| conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET IMMUTABLE_ROWS=true"); |
| assertImmutableRows(conn,fullTableName, true); |
| |
| |
| conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET immutable_rows=false"); |
| assertImmutableRows(conn,fullTableName, false); |
| } |
| } |
| |
| private void createTableAndLoadData(Connection conn1, String tableName, String indexName, String[] strings, boolean isReverse) throws SQLException { |
| createBaseTable(conn1, tableName, null); |
| for (int i = 0; i < 26; i++) { |
| conn1.createStatement().execute( |
| "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + "," |
| + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); |
| } |
| conn1.commit(); |
| conn1.createStatement().execute( |
| "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)"); |
| } |
| |
| @Test |
| public void testIndexHalfStoreFileReader() throws Exception { |
| Connection conn1 = getConnection(); |
| ConnectionQueryServices connectionQueryServices = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES); |
| HBaseAdmin admin = connectionQueryServices.getAdmin(); |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| createBaseTable(conn1, tableName, "('e')"); |
| conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')")); |
| conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')"); |
| conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')"); |
| conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); |
| conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); |
| conn1.commit(); |
| |
| |
| String query = "SELECT count(*) FROM " + tableName +" where v1<='z'"; |
| ResultSet rs = conn1.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals(4, rs.getInt(1)); |
| |
| TableName indexTable = TableName.valueOf(localIndex?tableName: indexName); |
| admin.flush(indexTable); |
| boolean merged = false; |
| HTableInterface table = connectionQueryServices.getTable(indexTable.getName()); |
| // merge regions until 1 left |
| long numRegions = 0; |
| while (true) { |
| rs = conn1.createStatement().executeQuery(query); |
| assertTrue(rs.next()); |
| assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? |
| try { |
| List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); |
| numRegions = indexRegions.size(); |
| if (numRegions==1) { |
| break; |
| } |
| if(!merged) { |
| List<HRegionInfo> regions = |
| admin.getTableRegions(indexTable); |
| Log.info("Merging: " + regions.size()); |
| admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), |
| regions.get(1).getEncodedNameAsBytes(), false); |
| merged = true; |
| Threads.sleep(10000); |
| } |
| } catch (Exception ex) { |
| Log.info(ex); |
| } |
| long waitStartTime = System.currentTimeMillis(); |
| // wait until merge happened |
| while (System.currentTimeMillis() - waitStartTime < 10000) { |
| List<HRegionInfo> regions = admin.getTableRegions(indexTable); |
| Log.info("Waiting:" + regions.size()); |
| if (regions.size() < numRegions) { |
| break; |
| } |
| Threads.sleep(1000); |
| } |
| SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable); |
| assertTrue("Index table should be online ", admin.isTableAvailable(indexTable)); |
| } |
| } |
| |
| |
| private List<HRegionInfo> splitDuringScan(Connection conn1, String tableName, String indexName, String[] strings, HBaseAdmin admin, boolean isReverse) |
| throws SQLException, IOException, InterruptedException { |
| ResultSet rs; |
| |
| String query = "SELECT t_id,k1,v1 FROM " + tableName; |
| rs = conn1.createStatement().executeQuery(query); |
| String[] tIdColumnValues = new String[26]; |
| String[] v1ColumnValues = new String[26]; |
| int[] k1ColumnValue = new int[26]; |
| for (int j = 0; j < 5; j++) { |
| assertTrue(rs.next()); |
| tIdColumnValues[j] = rs.getString("t_id"); |
| k1ColumnValue[j] = rs.getInt("k1"); |
| v1ColumnValues[j] = rs.getString("V1"); |
| } |
| |
| String[] splitKeys = new String[2]; |
| splitKeys[0] = strings[4]; |
| splitKeys[1] = strings[12]; |
| |
| int[] splitInts = new int[2]; |
| splitInts[0] = 22; |
| splitInts[1] = 4; |
| List<HRegionInfo> regionsOfUserTable = null; |
| for(int i = 0; i <=1; i++) { |
| Threads.sleep(10000); |
| if(localIndex) { |
| admin.split(Bytes.toBytes(tableName), |
| ByteUtil.concat(Bytes.toBytes(splitKeys[i]))); |
| } else { |
| admin.split(Bytes.toBytes(indexName), ByteUtil.concat(Bytes.toBytes(splitInts[i]))); |
| } |
| Thread.sleep(100); |
| regionsOfUserTable = |
| MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), |
| admin.getConnection(), TableName.valueOf(localIndex?tableName:indexName), |
| false); |
| |
| while (regionsOfUserTable.size() != (i+2)) { |
| Thread.sleep(100); |
| regionsOfUserTable = |
| MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), |
| admin.getConnection(), |
| TableName.valueOf(localIndex?tableName:indexName), false); |
| } |
| assertEquals(i+2, regionsOfUserTable.size()); |
| } |
| for (int j = 5; j < 26; j++) { |
| assertTrue(rs.next()); |
| tIdColumnValues[j] = rs.getString("t_id"); |
| k1ColumnValue[j] = rs.getInt("k1"); |
| v1ColumnValues[j] = rs.getString("V1"); |
| } |
| Arrays.sort(tIdColumnValues); |
| Arrays.sort(v1ColumnValues); |
| Arrays.sort(k1ColumnValue); |
| assertTrue(Arrays.equals(strings, tIdColumnValues)); |
| assertTrue(Arrays.equals(strings, v1ColumnValues)); |
| for(int i=0;i<26;i++) { |
| assertEquals(i, k1ColumnValue[i]); |
| } |
| assertFalse(rs.next()); |
| return regionsOfUserTable; |
| } |
| |
| private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException { |
| String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + |
| "k1 INTEGER NOT NULL,\n" + |
| "k2 INTEGER NOT NULL,\n" + |
| "k3 INTEGER,\n" + |
| "v1 VARCHAR,\n" + |
| "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" |
| + (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : ""); |
| conn.createStatement().execute(ddl); |
| } |
| |
| @Test |
| public void testTenantSpecificConnection() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| // create data table |
| conn.createStatement().execute( |
| "CREATE TABLE IF NOT EXISTS " + fullTableName + |
| "(TENANT_ID CHAR(15) NOT NULL,"+ |
| "TYPE VARCHAR(25),"+ |
| "ENTITY_ID CHAR(15) NOT NULL,"+ |
| "CONSTRAINT PK_CONSTRAINT PRIMARY KEY (TENANT_ID, ENTITY_ID)) MULTI_TENANT=TRUE " |
| + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "") ); |
| // create index |
| conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName + " (ENTITY_ID, TYPE)"); |
| |
| // upsert rows |
| String dml = "UPSERT INTO " + fullTableName + " (ENTITY_ID, TYPE) VALUES ( ?, ?)"; |
| props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "tenant1"); |
| // connection is tenant-specific |
| try (Connection tenantConn = getConnection(props)) { |
| // upsert one row |
| upsertRow(dml, tenantConn, 0); |
| tenantConn.commit(); |
| ResultSet rs = tenantConn.createStatement().executeQuery("SELECT ENTITY_ID FROM " + fullTableName + " ORDER BY TYPE LIMIT 5"); |
| assertTrue(rs.next()); |
| // upsert two rows which ends up using the tenant cache |
| upsertRow(dml, tenantConn, 1); |
| upsertRow(dml, tenantConn, 2); |
| tenantConn.commit(); |
| } |
| } |
| } |
| |
| // Tests that if major compaction is run on a table with a disabled index, |
| // deleted cells are kept |
| @Test |
| public void testCompactDisabledIndex() throws Exception { |
| try (Connection conn = getConnection()) { |
| String schemaName = generateUniqueName(); |
| String dataTableName = generateUniqueName() + "_DATA"; |
| String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); |
| String indexTableName = generateUniqueName() + "_IDX"; |
| String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); |
| conn.createStatement().execute( |
| String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); |
| conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, |
| indexTableName, dataTableFullName)); |
| |
| //insert a row, and delete it |
| PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, dataTableFullName); |
| conn.createStatement().execute("DELETE FROM " + dataTableFullName); |
| conn.commit(); |
| |
| // disable the index, simulating an index write failure |
| PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); |
| IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, |
| EnvironmentEdgeManager.currentTimeMillis()); |
| |
| // major compaction should not remove the deleted row |
| List<HRegion> regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName)); |
| HRegion hRegion = regions.get(0); |
| hRegion.flush(true); |
| HStore store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); |
| store.triggerMajorCompaction(); |
| store.compactRecentForTestingAssumingDefaultPolicy(1); |
| HTableInterface dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); |
| assertEquals(1, TestUtil.getRawRowCount(dataHTI)); |
| |
| // reenable the index |
| IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.INACTIVE, |
| EnvironmentEdgeManager.currentTimeMillis()); |
| IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.ACTIVE, 0L); |
| |
| // now major compaction should remove the deleted row |
| store.triggerMajorCompaction(); |
| store.compactRecentForTestingAssumingDefaultPolicy(1); |
| dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); |
| assertEquals(0, TestUtil.getRawRowCount(dataHTI)); |
| } |
| } |
| |
| // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a |
| // corresponding row in syscat. This tests that compaction isn't blocked |
| @Test(timeout=120000) |
| public void testCompactNonPhoenixTable() throws Exception { |
| try (Connection conn = getConnection()) { |
| // create a vanilla HBase table (non-Phoenix) |
| String randomTable = generateUniqueName(); |
| TableName hbaseTN = TableName.valueOf(randomTable); |
| byte[] famBytes = Bytes.toBytes("fam"); |
| HTable hTable = getUtility().createTable(hbaseTN, famBytes); |
| TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class); |
| Put put = new Put(Bytes.toBytes("row")); |
| byte[] value = new byte[1]; |
| Bytes.random(value); |
| put.add(famBytes, Bytes.toBytes("colQ"), value); |
| hTable.put(put); |
| hTable.flushCommits(); |
| |
| // major compaction shouldn't cause a timeout or RS abort |
| List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN); |
| HRegion hRegion = regions.get(0); |
| hRegion.flush(true); |
| HStore store = (HStore) hRegion.getStore(famBytes); |
| store.triggerMajorCompaction(); |
| store.compactRecentForTestingAssumingDefaultPolicy(1); |
| |
| // we should be able to compact syscat itself as well |
| regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); |
| hRegion = regions.get(0); |
| hRegion.flush(true); |
| store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); |
| store.triggerMajorCompaction(); |
| store.compactRecentForTestingAssumingDefaultPolicy(1); |
| } |
| } |
| |
| /** |
| * PHOENIX-4988 |
| * Test updating only a non-indexed column after two successive deletes to an indexed row |
| */ |
| @Test |
| public void testUpdateNonIndexedColumn() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); |
| String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); |
| try (Connection conn = getConnection()) { |
| conn.setAutoCommit(false); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); |
| conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v2)"); |
| conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_1','v2_1')"); |
| conn.commit(); |
| conn.createStatement().executeUpdate("DELETE FROM " + fullTableName); |
| conn.commit(); |
| conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_2','v2_2')"); |
| conn.commit(); |
| conn.createStatement().executeUpdate("DELETE FROM " + fullTableName); |
| conn.commit(); |
| conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1) VALUES ('testKey','v1_3')"); |
| conn.commit(); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| // PHOENIX-4980 |
| // When there is a flush after a data table update of non-indexed columns, the |
| // index gets out of sync on the next write |
| getUtility().getHBaseAdmin().flush(TableName.valueOf(fullTableName)); |
| conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_4','v2_3')"); |
| conn.commit(); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException { |
| PreparedStatement stmt = tenantConn.prepareStatement(dml); |
| stmt.setString(1, "00000000000000" + String.valueOf(i)); |
| stmt.setString(2, String.valueOf(i)); |
| assertEquals(1,stmt.executeUpdate()); |
| } |
| } |