| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end.index; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; |
| import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; |
| import org.apache.phoenix.coprocessor.MetaDataRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; |
| import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.execute.CommitException; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.jdbc.PhoenixStatement; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PMetaData; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableKey; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.util.EnvironmentEdge; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.IndexScrutiny; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.Repeat; |
| import org.apache.phoenix.util.RunUntilFailure; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.TestUtil; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Maps; |
| |
| @SuppressWarnings("deprecation") |
| @RunWith(RunUntilFailure.class) |
| public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { |
| private static final Logger LOG = LoggerFactory.getLogger(PartialIndexRebuilderIT.class); |
| private static final Random RAND = new Random(5); |
| private static final int WAIT_AFTER_DISABLED = 5000; |
| private static final long REBUILD_PERIOD = 50000; |
| private static final long REBUILD_INTERVAL = 2000; |
| private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; |
| |
| |
| @BeforeClass |
| public static void doSetup() throws Exception { |
| Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); |
| serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); |
| serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL)); |
| serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000"); |
| serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds |
| serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); |
| Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); |
| clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); |
| setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); |
| indexRebuildTaskRegionEnvironment = |
| (RegionCoprocessorEnvironment) getUtility() |
| .getRSForFirstRegionInTable( |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) |
| .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) |
| .get(0).getCoprocessorHost() |
| .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); |
| MetaDataRegionObserver.initRebuildIndexConnectionProps( |
| indexRebuildTaskRegionEnvironment.getConfiguration()); |
| } |
| |
| private static void runIndexRebuilder(String table) throws InterruptedException, SQLException { |
| runIndexRebuilder(Collections.<String>singletonList(table)); |
| } |
| |
| private static void runIndexRebuilder(List<String> tables) throws InterruptedException, SQLException { |
| BuildIndexScheduleTask task = |
| new MetaDataRegionObserver.BuildIndexScheduleTask( |
| indexRebuildTaskRegionEnvironment, tables); |
| task.run(); |
| } |
| |
| private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, String table) { |
| runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table)); |
| } |
| private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) { |
| Thread thread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| while (!cancel[0]) { |
| try { |
| runIndexRebuilder(tables); |
| Thread.sleep(interval); |
| } catch (InterruptedException e) { |
| Thread.interrupted(); |
| throw new RuntimeException(e); |
| } catch (SQLException e) { |
| LOG.error(e.getMessage(),e); |
| } |
| } |
| } |
| }); |
| thread.setDaemon(true); |
| thread.start(); |
| } |
| |
| private static void mutateRandomly(final String fullTableName, final int nThreads, final int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) { |
| Runnable[] runnables = new Runnable[nThreads]; |
| for (int i = 0; i < nThreads; i++) { |
| runnables[i] = new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| Connection conn = DriverManager.getConnection(getUrl()); |
| for (int i = 0; i < 3000; i++) { |
| boolean isNull = RAND.nextBoolean(); |
| int randInt = RAND.nextInt() % nIndexValues; |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (" + pk + ", 0, " + (isNull ? null : randInt) + ")"); |
| if ((i % batchSize) == 0) { |
| conn.commit(); |
| } |
| } |
| conn.commit(); |
| for (int i = 0; i < 3000; i++) { |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k1= " + pk + " AND k2=0"); |
| if (i % batchSize == 0) { |
| conn.commit(); |
| } |
| } |
| conn.commit(); |
| for (int i = 0; i < 3000; i++) { |
| int randInt = RAND.nextInt() % nIndexValues; |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (" + pk + ", 0, " + randInt + ")"); |
| if ((i % batchSize) == 0) { |
| conn.commit(); |
| } |
| } |
| conn.commit(); |
| } catch (SQLException e) { |
| throw new RuntimeException(e); |
| } finally { |
| doneSignal.countDown(); |
| } |
| } |
| |
| }; |
| } |
| for (int i = 0; i < nThreads; i++) { |
| Thread t = new Thread(runnables[i]); |
| t.start(); |
| } |
| } |
| @Test |
| public void testConcurrentUpsertsWithRebuild() throws Throwable { |
| int nThreads = 5; |
| final int batchSize = 200; |
| final int nRows = 51; |
| final int nIndexValues = 23; |
| final String schemaName = ""; |
| final String tableName = generateUniqueName(); |
| final String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| Connection conn = DriverManager.getConnection(getUrl()); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)"); |
| |
| final CountDownLatch doneSignal1 = new CountDownLatch(nThreads); |
| mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal1); |
| assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS)); |
| |
| IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE); |
| boolean[] cancel = new boolean[1]; |
| try { |
| do { |
| final CountDownLatch doneSignal2 = new CountDownLatch(nThreads); |
| runIndexRebuilderAsync(500,cancel,fullTableName); |
| mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2); |
| assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS)); |
| } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| } finally { |
| cancel[0] = true; |
| } |
| long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| assertEquals(nRows, actualRowCount); |
| } |
| |
| private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows) throws Exception { |
| return mutateRandomly(conn, fullTableName, nRows, false, null); |
| } |
| |
| private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { |
| PTable table = metaCache.getTableRef(key).getTable(); |
| for (PTable index : table.getIndexes()) { |
| if (index.getIndexState() == PIndexState.INACTIVE) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException { |
| return hasIndexWithState(metaCache, key, PIndexState.DISABLE); |
| } |
| |
| private static boolean hasIndexWithState(PMetaData metaCache, PTableKey key, PIndexState expectedState) throws TableNotFoundException { |
| PTable table = metaCache.getTableRef(key).getTable(); |
| for (PTable index : table.getIndexes()) { |
| if (index.getIndexState() == expectedState) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows, boolean checkForInactive, String fullIndexName) throws SQLException, InterruptedException { |
| PTableKey key = new PTableKey(null,fullTableName); |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| boolean hasInactiveIndex = false; |
| int batchSize = 200; |
| if (checkForInactive) { |
| batchSize = 3; |
| } |
| for (int i = 0; i < 10000; i++) { |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| int v1 = Math.abs(RAND.nextInt()) % nRows; |
| int v2 = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(" + pk + "," + v1 + "," + v2 + ")"); |
| if (i % batchSize == 0) { |
| conn.commit(); |
| if (checkForInactive) { |
| if (hasInactiveIndex(metaCache, key)) { |
| checkForInactive = false; |
| hasInactiveIndex = true; |
| batchSize = 200; |
| } |
| } |
| } |
| } |
| conn.commit(); |
| for (int i = 0; i < 10000; i++) { |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k= " + pk); |
| if (i % batchSize == 0) { |
| conn.commit(); |
| if (checkForInactive) { |
| if (hasInactiveIndex(metaCache, key)) { |
| checkForInactive = false; |
| hasInactiveIndex = true; |
| batchSize = 200; |
| } |
| } |
| } |
| } |
| conn.commit(); |
| for (int i = 0; i < 10000; i++) { |
| int pk = Math.abs(RAND.nextInt()) % nRows; |
| int v1 = Math.abs(RAND.nextInt()) % nRows; |
| int v2 = Math.abs(RAND.nextInt()) % nRows; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(" + pk + "," + v1 + "," + v2 + ")"); |
| if (i % batchSize == 0) { |
| conn.commit(); |
| if (checkForInactive) { |
| if (hasInactiveIndex(metaCache, key)) { |
| checkForInactive = false; |
| hasInactiveIndex = true; |
| batchSize = 200; |
| } |
| } |
| } |
| } |
| conn.commit(); |
| return hasInactiveIndex; |
| } |
| |
| @Test |
| @Repeat(5) |
| public void testDeleteAndUpsertAfterFailure() throws Throwable { |
| final int nRows = 10; |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); |
| mutateRandomly(conn, fullTableName, nRows); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| boolean[] cancel = new boolean[1]; |
| try { |
| runIndexRebuilderAsync(500,cancel,fullTableName); |
| mutateRandomly(conn, fullTableName, nRows); |
| TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); |
| } finally { |
| cancel[0] = true; |
| } |
| |
| long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| assertEquals(nRows,actualRowCount); |
| } |
| } |
| |
| @Test |
| public void testWriteWhileRebuilding() throws Throwable { |
| final int nRows = 10; |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); |
| mutateRandomly(conn, fullTableName, nRows); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| final boolean[] hasInactiveIndex = new boolean[1]; |
| final CountDownLatch doneSignal = new CountDownLatch(1); |
| Runnable r = new Runnable() { |
| |
| @Override |
| public void run() { |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| hasInactiveIndex[0] = mutateRandomly(conn, fullTableName, nRows, true, fullIndexName); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } finally { |
| doneSignal.countDown(); |
| } |
| } |
| |
| }; |
| Thread t = new Thread(r); |
| t.setDaemon(true); |
| t.start(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| boolean[] cancel = new boolean[1]; |
| try { |
| runIndexRebuilderAsync(500,cancel,fullTableName); |
| TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); |
| doneSignal.await(60, TimeUnit.SECONDS); |
| } finally { |
| cancel[0] = true; |
| } |
| assertTrue(hasInactiveIndex[0]); |
| |
| long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| assertEquals(nRows,actualRowCount); |
| |
| } |
| } |
| |
| @Test |
| public void testMultiVersionsAfterFailure() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); |
| conn.commit(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd')"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')"); |
| conn.commit(); |
| runIndexRebuilder(fullTableName); |
| Thread.sleep(WAIT_AFTER_DISABLED); |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| @Test |
| public void testUpsertNullAfterFailure() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); |
| conn.commit(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); |
| conn.commit(); |
| runIndexRebuilder(fullTableName); |
| Thread.sleep(WAIT_AFTER_DISABLED); |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| @Test |
| public void testUpsertNullTwiceAfterFailure() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); |
| conn.commit(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); |
| conn.commit(); |
| conn.createStatement().execute("DELETE FROM " + fullTableName); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); |
| conn.commit(); |
| runIndexRebuilder(fullTableName); |
| Thread.sleep(WAIT_AFTER_DISABLED); |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| @Test |
| public void testDeleteAfterFailure() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); |
| conn.commit(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); |
| conn.commit(); |
| conn.createStatement().execute("DELETE FROM " + fullTableName); |
| conn.commit(); |
| runIndexRebuilder(fullTableName); |
| Thread.sleep(WAIT_AFTER_DISABLED); |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| @Test |
| public void testDeleteBeforeFailure() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); |
| conn.commit(); |
| conn.createStatement().execute("DELETE FROM " + fullTableName); |
| conn.commit(); |
| long disableTS = EnvironmentEdgeManager.currentTimeMillis(); |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); |
| conn.commit(); |
| runIndexRebuilder(fullTableName); |
| Thread.sleep(WAIT_AFTER_DISABLED); |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } |
| } |
| |
| private static class MyClock extends EnvironmentEdge { |
| public volatile long time; |
| |
| public MyClock (long time) { |
| this.time = time; |
| } |
| |
| @Override |
| public long currentTime() { |
| return time; |
| } |
| } |
| |
| private static void waitForIndexState(Connection conn, String fullTableName, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { |
| int nRetries = 2; |
| PIndexState actualIndexState = null; |
| do { |
| runIndexRebuilder(fullTableName); |
| if ((actualIndexState = TestUtil.getIndexState(conn, fullIndexName)) == expectedIndexState) { |
| return; |
| } |
| Thread.sleep(1000); |
| } while (--nRetries > 0); |
| fail("Expected index state of " + expectedIndexState + ", but was " + actualIndexState); |
| } |
| |
| @Test |
| public void testMultiValuesAtSameTS() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); |
| conn.commit(); |
| clock.time += 1000; |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testTimeBatchesInCoprocessorRequired() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); |
| clock.time += 100; |
| long disableTime = clock.currentTime(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11')"); |
| conn.commit(); |
| clock.time += 100; |
| assertTrue(hasDisabledIndex(metaCache, key)); |
| assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','0')"); |
| conn.commit(); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); |
| conn.commit(); |
| clock.time += 100; |
| IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); |
| clock.time += 100; |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testBatchingDuringRebuild() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| long disableTime = clock.currentTime(); |
| IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')"); |
| conn.commit(); |
| clock.time += REBUILD_PERIOD; |
| assertTrue(hasDisabledIndex(metaCache, key)); |
| assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')"); |
| conn.commit(); |
| assertEquals(3,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| clock.time += 100; |
| |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertEquals(2,TestUtil.getRowCount(conn, fullIndexName)); |
| |
| clock.time += REBUILD_PERIOD; |
| runIndexRebuilder(fullTableName); |
| // Verify that other batches were processed |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testUpperBoundSetOnRebuild() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); |
| clock.time += 100; |
| long disableTime = clock.currentTime(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a', '0')"); |
| conn.commit(); |
| // Set clock forward in time past the "overlap" amount we wait for index maintenance to kick in |
| clock.time += 2 * WAIT_AFTER_DISABLED; |
| assertTrue(hasDisabledIndex(metaCache, key)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(0,TestUtil.getRowCount(conn, fullIndexName)); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb','11')"); |
| conn.commit(); |
| assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(0,TestUtil.getRowCount(conn, fullIndexName)); |
| // Set clock back in time and start rebuild |
| clock.time = disableTime + 100; |
| IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); |
| // If an upper bound was set on the rebuilder, we should only have found one row |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testMultiValuesWhenDisableAndInactive() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')"); |
| conn.commit(); |
| clock.time += 100; |
| try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { |
| // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering |
| IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); |
| clock.time += 100; |
| long disableTime = clock.currentTime(); |
| // Set some values while index disabled |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')"); |
| conn.commit(); |
| clock.time += 100; |
| assertTrue(hasDisabledIndex(metaCache, key)); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')"); |
| conn.commit(); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')"); |
| conn.commit(); |
| clock.time += 100; |
| // Will cause partial index rebuilder to be triggered |
| IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); |
| } |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)); |
| |
| // Set some values while index is in INACTIVE state |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); |
| conn.commit(); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); |
| conn.commit(); |
| clock.time += WAIT_AFTER_DISABLED; |
| // Enough time has passed, so rebuild will start now |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testIndexWriteFailureDisablingIndex() throws Throwable { |
| testIndexWriteFailureDuringRebuild(PIndexState.DISABLE); |
| } |
| |
| @Test |
| public void testIndexWriteFailureLeavingIndexActive() throws Throwable { |
| testIndexWriteFailureDuringRebuild(PIndexState.PENDING_ACTIVE); |
| } |
| |
| private void testIndexWriteFailureDuringRebuild(PIndexState indexStateOnFailure) throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = " + (indexStateOnFailure == PIndexState.DISABLE)); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| |
| long disableTime = clock.currentTime(); |
| // Simulates an index write failure |
| IndexUtil.updateIndexState(fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? disableTime : -disableTime, metaTable, indexStateOnFailure); |
| |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')"); |
| conn.commit(); |
| |
| // Large enough to be in separate time batch |
| clock.time += 2 * REBUILD_PERIOD; |
| assertTrue(hasIndexWithState(metaCache, key, indexStateOnFailure)); |
| assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')"); |
| conn.commit(); |
| assertEquals(3,TestUtil.getRowCount(conn, fullTableName)); |
| assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); |
| clock.time += 100; |
| |
| waitForIndexState(conn, fullTableName, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| |
| // First batch should have been processed |
| runIndexRebuilder(fullTableName); |
| assertEquals(2,TestUtil.getRowCount(conn, fullIndexName)); |
| |
| // Simulate write failure |
| TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('dddd','dddd','3333')"); |
| try { |
| conn.commit(); |
| fail(); |
| } catch (CommitException e) { |
| // Expected |
| } |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure, null)); |
| PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); |
| ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); |
| assertTrue(rs.next()); |
| assertEquals("0", rs.getString(1)); |
| assertEquals(indexStateOnFailure == PIndexState.DISABLE ? fullTableName : fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); |
| TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); |
| |
| clock.time += 1000; |
| waitForIndexState(conn, fullTableName, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| |
| // First batch should have been processed again because we started over |
| runIndexRebuilder(fullTableName); |
| assertEquals(3,TestUtil.getRowCount(conn, fullIndexName)); |
| |
| clock.time += 2 * REBUILD_PERIOD; |
| // Second batch should have been processed now |
| runIndexRebuilder(fullTableName); |
| clock.time += 2 * REBUILD_PERIOD; |
| runIndexRebuilder(fullTableName); |
| TestUtil.assertIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L); |
| |
| // Verify that other batches were processed |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'"); |
| conn.commit(); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); |
| conn.commit(); |
| clock.time += 1000; |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testDeleteAndUpsertValuesAtSameTS2() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.DISABLE); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); |
| conn.commit(); |
| conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'"); |
| conn.commit(); |
| clock.time += 1000; |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); |
| clock.time += WAIT_AFTER_DISABLED; |
| runIndexRebuilder(fullTableName); |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); |
| IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); |
| } finally { |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| @Test |
| public void testRegionsOnlineCheck() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| PTableKey key = new PTableKey(null,fullTableName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a')"); |
| conn.commit(); |
| Configuration conf = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); |
| PTable table = metaCache.getTableRef(key).getTable(); |
| assertTrue(MetaDataUtil.tableRegionsOnline(conf, table)); |
| try (HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { |
| admin.disableTable(fullTableName); |
| assertFalse(MetaDataUtil.tableRegionsOnline(conf, table)); |
| admin.enableTable(fullTableName); |
| } |
| assertTrue(MetaDataUtil.tableRegionsOnline(conf, table)); |
| } |
| } |
| |
| // Tests that when we've been in PENDING_DISABLE for too long, queries don't use the index, |
| // and the rebuilder should mark the index DISABLED |
| @Test |
| public void testPendingDisable() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| final MyClock clock = new MyClock(1000); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); |
| clock.time += 100; |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| clock.time += 100; |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); |
| conn.commit(); |
| clock.time += 100; |
| HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.PENDING_DISABLE); |
| Configuration conf = |
| conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); |
| // under threshold should use the index |
| PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); |
| ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); |
| assertTrue(rs.next()); |
| assertEquals("0", rs.getString(1)); |
| assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); |
| // over threshold should not use the index |
| long pendingDisableThreshold = conf.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, |
| QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); |
| clock.time += pendingDisableThreshold + 1000; |
| stmt = conn.createStatement().unwrap(PhoenixStatement.class); |
| rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); |
| assertTrue(rs.next()); |
| assertEquals("0", rs.getString(1)); |
| assertEquals(fullTableName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); |
| // if we're over the threshold, the rebuilder should disable the index |
| waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.DISABLE); |
| } finally { |
| EnvironmentEdgeManager.reset(); |
| } |
| } |
| |
| //Tests that when we're updating an index from within the RS (e.g. UngruopedAggregateRegionObserver), |
| // if the index write fails the index gets disabled |
| @Test |
| public void testIndexFailureWithinRSDisablesIndex() throws Throwable { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName = generateUniqueName(); |
| final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| try { |
| conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); |
| conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); |
| conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')"); |
| conn.commit(); |
| // Simulate write failure |
| TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); |
| conn.setAutoCommit(true); |
| try { |
| conn.createStatement().execute("DELETE FROM " + fullTableName); |
| fail(); |
| } catch (SQLException e) { |
| // expected |
| } |
| assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); |
| } finally { |
| TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); |
| } |
| } |
| } |
| |
| public static class WriteFailingRegionObserver extends SimpleRegionObserver { |
| @Override |
| public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { |
| // we need to advance the clock, since the index retry logic (copied from HBase) has a time component |
| EnvironmentEdge delegate = EnvironmentEdgeManager.getDelegate(); |
| if (delegate instanceof MyClock) { |
| MyClock myClock = (MyClock) delegate; |
| myClock.time += 1000; |
| } |
| throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); |
| } |
| } |
| } |