blob: 91d793e966bad70a803cc3a0655be7e1f0d78fe2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.phoenix.iterate.ScanningResultPostDummyResultCaller;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Uncovered index tests that include some region moves while performing rs#next.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class UncoveredIndexWithRegionMovesIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(UncoveredIndexWithRegionMovesIT.class);
private static boolean hasTestStarted = false;
private static int countOfDummyResults = 0;
private static final List<String> TABLE_NAMES = Collections.synchronizedList(new ArrayList<>());
private static class TestScanningResultPostDummyResultCaller extends
ScanningResultPostDummyResultCaller {
@Override
public void postDummyProcess() {
if (hasTestStarted && (countOfDummyResults++ % 3) == 0 &&
(countOfDummyResults < 17 ||
countOfDummyResults > 28 && countOfDummyResults < 40)) {
LOGGER.info("Moving regions of tables {}. current count of dummy results: {}",
TABLE_NAMES, countOfDummyResults);
TABLE_NAMES.forEach(table -> {
try {
moveRegionsOfTable(table);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, String.valueOf(2));
props.put(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, String.valueOf(1));
props.put(QueryServices.PHOENIX_POST_DUMMY_PROCESS,
TestScanningResultPostDummyResultCaller.class.getName());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@After
public void tearDown() throws Exception {
TABLE_NAMES.clear();
hasTestStarted = false;
countOfDummyResults = 0;
}
private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged)
throws SQLException {
Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
Map<MetricType, Long> metricValues = entry.getValue();
Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
assertNotNull(pagedRowsCntr);
if (isPaged) {
assertTrue(String.format("Got %d", pagedRowsCntr), pagedRowsCntr > 0);
} else {
assertEquals(String.format("Got %d", pagedRowsCntr), 0, (long) pagedRowsCntr);
}
}
assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
}
private static void moveRegionsOfTable(String tableName)
throws IOException {
try (AsyncConnection asyncConnection =
ConnectionFactory.createAsyncConnection(getUtility().getConfiguration())
.get()) {
AsyncAdmin admin = asyncConnection.getAdmin();
List<ServerName> servers =
new ArrayList<>(admin.getRegionServers().get());
ServerName server1 = servers.get(0);
ServerName server2 = servers.get(1);
List<RegionInfo> regionsOnServer1;
regionsOnServer1 = admin.getRegions(server1).get();
List<RegionInfo> regionsOnServer2;
regionsOnServer2 = admin.getRegions(server2).get();
regionsOnServer1.forEach(regionInfo -> {
if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
try {
for (int i = 0; i < 2; i++) {
RegionStatesCount regionStatesCount =
admin.getClusterMetrics().get().getTableRegionStatesCount()
.get(TableName.valueOf(tableName));
if (regionStatesCount.getRegionsInTransition() == 0 &&
regionStatesCount.getOpenRegions() ==
regionStatesCount.getTotalRegions()) {
LOGGER.info("Moving region {} to {}",
regionInfo.getRegionNameAsString(), server2);
admin.move(regionInfo.getEncodedNameAsBytes(), server2).get(3,
TimeUnit.SECONDS);
break;
} else {
LOGGER.info("Table {} has some region(s) in RIT or not online",
tableName);
}
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Something went wrong", e);
throw new RuntimeException(e);
}
}
});
regionsOnServer2.forEach(regionInfo -> {
if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
try {
for (int i = 0; i < 2; i++) {
RegionStatesCount regionStatesCount =
admin.getClusterMetrics().get().getTableRegionStatesCount()
.get(TableName.valueOf(tableName));
if (regionStatesCount.getRegionsInTransition() == 0 &&
regionStatesCount.getOpenRegions() ==
regionStatesCount.getTotalRegions()) {
admin.move(regionInfo.getEncodedNameAsBytes(), server1)
.get(3, TimeUnit.SECONDS);
LOGGER.info("Moving region {} to {}",
regionInfo.getRegionNameAsString(), server1);
break;
} else {
LOGGER.info("Table {} has some region(s) in RIT or not online",
tableName);
}
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Something went wrong", e);
throw new RuntimeException(e);
}
}
});
} catch (Exception e) {
LOGGER.error("Something went wrong..", e);
}
}
@Test
public void testUncoveredQueryWithGroupBy() throws Exception {
hasTestStarted = true;
String dataTableName = generateUniqueName();
populateTable(
dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
try (Connection conn = DriverManager.getConnection(getUrl())) {
String indexTableName = generateUniqueName();
conn.createStatement().execute("CREATE UNCOVERED INDEX "
+ indexTableName + " on " + dataTableName + " (val1) ");
TABLE_NAMES.add(dataTableName);
TABLE_NAMES.add(indexTableName);
String selectSql;
int limit = 10;
// Verify that an index hint is not necessary for an uncovered index
selectSql = "SELECT val2, val3 from " + dataTableName +
" WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') LIMIT " + limit;
assertExplainPlanWithLimit(conn, selectSql, dataTableName, indexTableName, limit);
ResultSet rs = conn.createStatement().executeQuery(selectSql);
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("bcd", rs.getString(1));
assertEquals("bcde", rs.getString(2));
assertFalse(rs.next());
assertServerPagingMetric(indexTableName, rs, true);
// Add another row and run a group by query where the uncovered index should be used
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
conn.commit();
selectSql = "SELECT count(val3) from " + dataTableName
+ " where val1 > '0' GROUP BY val1";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
rs = conn.createStatement().executeQuery(selectSql);
TestUtil.dumpTable(conn, TableName.valueOf(dataTableName));
TestUtil.dumpTable(conn, TableName.valueOf(indexTableName));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertFalse(rs.next());
selectSql = "SELECT count(val3) from " + dataTableName + " where val1 > '0'";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
// Run an order by query where the uncovered index should be used
selectSql = "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
rs = conn.createStatement().executeQuery(selectSql);
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("abcd", rs.getString(1));
assertTrue(rs.next());
assertEquals("cdef", rs.getString(1));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("bcde", rs.getString(1));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertFalse(rs.next());
}
}
@Test
public void testUncoveredQuery() throws Exception {
testUncoveredUtil(false);
}
@Test
public void testUncoveredQueryWithLimit() throws Exception {
testUncoveredUtil(true);
}
private void testUncoveredUtil(boolean limit) throws Exception {
hasTestStarted = true;
String dataTableName = generateUniqueName();
populateTable(dataTableName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
String indexTableName = generateUniqueName();
conn.createStatement().execute("CREATE UNCOVERED INDEX "
+ indexTableName + " on " + dataTableName + " (val1) ");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('c', 'cd','cde', 'cdef')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('d', 'de','de1', 'de11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('e', 'ef','ef1', 'ef11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('f', 'fg','fg1', 'fg11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('g', 'gh','gh1', 'gh11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('h', 'hi','hi1', 'hi11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('i', 'ij','ij1', 'ij11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('j', 'jk','jk1', 'jk11')");
conn.createStatement().execute("upsert into " + dataTableName
+ " (id, val1, val2, val3) values ('k', 'kl','kl1', 'kl11')");
conn.commit();
TABLE_NAMES.add(dataTableName);
TABLE_NAMES.add(indexTableName);
String selectSql;
// Verify that an index hint is not necessary for an uncovered index
selectSql = "SELECT val2, val3 from " + dataTableName +
" WHERE val1 IS NOT NULL" + (limit ? " LIMIT 15" : "");
ResultSet rs = conn.createStatement().executeQuery(selectSql);
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("abc", rs.getString(1));
assertEquals("abcd", rs.getString(2));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("bcd", rs.getString(1));
assertEquals("bcde", rs.getString(2));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("cde", rs.getString(1));
assertEquals("cdef", rs.getString(2));
assertTrue(rs.next());
assertEquals("de1", rs.getString(1));
assertEquals("de11", rs.getString(2));
assertTrue(rs.next());
assertEquals("ef1", rs.getString(1));
assertEquals("ef11", rs.getString(2));
assertTrue(rs.next());
assertEquals("fg1", rs.getString(1));
assertEquals("fg11", rs.getString(2));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("gh1", rs.getString(1));
assertEquals("gh11", rs.getString(2));
assertTrue(rs.next());
assertEquals("hi1", rs.getString(1));
assertEquals("hi11", rs.getString(2));
moveRegionsOfTable(dataTableName);
moveRegionsOfTable(indexTableName);
assertTrue(rs.next());
assertEquals("ij1", rs.getString(1));
assertEquals("ij11", rs.getString(2));
assertTrue(rs.next());
assertEquals("jk1", rs.getString(1));
assertEquals("jk11", rs.getString(2));
assertTrue(rs.next());
assertEquals("kl1", rs.getString(1));
assertEquals("kl11", rs.getString(2));
assertFalse(rs.next());
assertServerPagingMetric(indexTableName, rs, true);
}
}
private void populateTable(String tableName) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute("create table " + tableName +
" (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10)," +
" val3 varchar(10))");
conn.createStatement().execute("upsert into " + tableName
+ " values ('a', 'ab', 'abc', 'abcd')");
conn.commit();
conn.createStatement().execute("upsert into " + tableName
+ " values ('b', 'bc', 'bcd', 'bcde')");
conn.commit();
conn.close();
}
}