blob: 09d28f8c899c945842035f2a62a68c6b150e63d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.schema.stats;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
private final String tableDDLOptions;
private final boolean columnEncoded;
private String tableName;
private String schemaName;
private String fullTableName;
private String physicalTableName;
private final boolean userTableNamespaceMapped;
private final boolean mutable;
private static final int defaultGuidePostWidth = 20;
protected StatsCollectorIT(boolean mutable, boolean transactional, boolean userTableNamespaceMapped, boolean columnEncoded) {
StringBuilder sb = new StringBuilder();
if (transactional) {
sb.append("TRANSACTIONAL=true");
}
if (!columnEncoded) {
if (sb.length()>0) {
sb.append(",");
}
sb.append("COLUMN_ENCODED_BYTES=0");
} else {
if (sb.length()>0) {
sb.append(",");
}
sb.append("COLUMN_ENCODED_BYTES=4");
}
if (!mutable) {
if (sb.length()>0) {
sb.append(",");
}
sb.append("IMMUTABLE_ROWS=true");
if (!columnEncoded) {
sb.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
}
}
this.tableDDLOptions = sb.toString();
this.userTableNamespaceMapped = userTableNamespaceMapped;
this.columnEncoded = columnEncoded;
this.mutable = mutable;
}
@BeforeClass
public static void doSetup() throws Exception {
// enable name space mapping at global level on both client and server side
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
clientProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth));
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@Before
public void generateTableNames() throws SQLException {
schemaName = generateUniqueName();
if (userTableNamespaceMapped) {
try (Connection conn = getConnection()) {
conn.createStatement().execute("CREATE SCHEMA " + schemaName);
}
}
tableName = "T_" + generateUniqueName();
fullTableName = SchemaUtil.getTableName(schemaName, tableName);
physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, userTableNamespaceMapped).getString();
}
private Connection getConnection() throws SQLException {
return getConnection(Integer.MAX_VALUE);
}
private Connection getConnection(Integer statsUpdateFreq) throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
props.setProperty(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(statsUpdateFreq));
// enable/disable namespace mapping at connection level
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(userTableNamespaceMapped));
return DriverManager.getConnection(getUrl(), props);
}
@Test
public void testUpdateEmptyStats() throws Exception {
Connection conn = getConnection();
conn.setAutoCommit(true);
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k CHAR(1) PRIMARY KEY )" + tableDDLOptions);
conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
String explainPlan = QueryUtil.getExplainPlan(rs);
assertEquals(
"CLIENT 1-CHUNK 0 ROWS 20 BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName + "\n" +
" SERVER FILTER BY FIRST KEY ONLY",
explainPlan);
conn.close();
}
@Test
public void testSomeUpdateEmptyStats() throws Exception {
Connection conn = getConnection();
conn.setAutoCommit(true);
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k VARCHAR PRIMARY KEY, a.v1 VARCHAR, b.v2 VARCHAR ) " + tableDDLOptions + (tableDDLOptions.isEmpty() ? "" : ",") + "SALT_BUCKETS = 3");
conn.createStatement().execute("UPSERT INTO " + fullTableName + "(k,v1) VALUES('a','123456789')");
conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
ResultSet rs;
String explainPlan;
rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + fullTableName + " WHERE v2='foo'");
explainPlan = QueryUtil.getExplainPlan(rs);
// if we are using the ONE_CELL_PER_COLUMN_FAMILY storage scheme, we will have the single kv even though there are no values for col family v2
String stats = columnEncoded && !mutable ? "4-CHUNK 1 ROWS 38 BYTES" : "3-CHUNK 0 ROWS 20 BYTES";
assertEquals(
"CLIENT " + stats + " PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" +
" SERVER FILTER BY B.V2 = 'foo'\n" +
"CLIENT MERGE SORT",
explainPlan);
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
explainPlan = QueryUtil.getExplainPlan(rs);
assertEquals(
"CLIENT 4-CHUNK 1 ROWS " + (columnEncoded ? "28" : "34") + " BYTES PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" +
"CLIENT MERGE SORT",
explainPlan);
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName + " WHERE k = 'a'");
explainPlan = QueryUtil.getExplainPlan(rs);
assertEquals(
"CLIENT 1-CHUNK 1 ROWS " + (columnEncoded ? "204" : "202") + " BYTES PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + physicalTableName + "\n" +
"CLIENT MERGE SORT",
explainPlan);
conn.close();
}
@Test
public void testUpdateStats() throws SQLException, IOException,
InterruptedException {
Connection conn;
PreparedStatement stmt;
ResultSet rs;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = getConnection();
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))"
+ tableDDLOptions );
String[] s;
Array array;
conn = upsertValues(props, fullTableName);
// CAll the update statistics query here. If already major compaction has run this will not get executed.
stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName);
stmt.execute();
stmt = upsertStmt(conn, fullTableName);
stmt.setString(1, "z");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName);
stmt.execute();
rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName);
assertTrue(rs.next());
conn.close();
}
private void testNoDuplicatesAfterUpdateStats(String splitKey) throws Throwable {
Connection conn = getConnection();
PreparedStatement stmt;
ResultSet rs;
conn.createStatement()
.execute("CREATE TABLE " + fullTableName
+ " ( k VARCHAR, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k))"+ tableDDLOptions
+ (splitKey != null ? " split on (" + splitKey + ")" : "") );
conn.createStatement().execute("upsert into " + fullTableName + " values ('abc',1,3)");
conn.createStatement().execute("upsert into " + fullTableName + " values ('def',2,4)");
conn.commit();
conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName + " order by k desc");
assertTrue(rs.next());
assertEquals("def", rs.getString(1));
assertTrue(rs.next());
assertEquals("abc", rs.getString(1));
assertTrue(!rs.next());
conn.close();
}
@Test
public void testNoDuplicatesAfterUpdateStatsWithSplits() throws Throwable {
testNoDuplicatesAfterUpdateStats("'abc','def'");
}
@Test
public void testNoDuplicatesAfterUpdateStatsWithDesc() throws Throwable {
testNoDuplicatesAfterUpdateStats(null);
}
@Test
public void testUpdateStatsWithMultipleTables() throws Throwable {
String fullTableName2 = SchemaUtil.getTableName(schemaName, "T_" + generateUniqueName());
Connection conn;
PreparedStatement stmt;
ResultSet rs;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = getConnection();
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions );
conn.createStatement().execute(
"CREATE TABLE " + fullTableName2 +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions );
String[] s;
Array array;
conn = upsertValues(props, fullTableName);
conn = upsertValues(props, fullTableName2);
// CAll the update statistics query here
stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName);
stmt.execute();
stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2);
stmt.execute();
stmt = upsertStmt(conn, fullTableName);
stmt.setString(1, "z");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
stmt = upsertStmt(conn, fullTableName2);
stmt.setString(1, "z");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.close();
conn = getConnection();
// This analyze would not work
stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2);
stmt.execute();
rs = conn.createStatement().executeQuery("SELECT k FROM "+fullTableName2);
assertTrue(rs.next());
conn.close();
}
private Connection upsertValues(Properties props, String tableName) throws SQLException, IOException,
InterruptedException {
Connection conn;
PreparedStatement stmt;
conn = getConnection();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "a");
String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" };
Array array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "abc", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "b");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "c");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "d");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "b");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "e");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(2, array);
s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
conn.commit();
return conn;
}
private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt;
stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
return stmt;
}
private void compactTable(Connection conn, String tableName) throws Exception {
TestUtil.doMajorCompaction(conn, tableName);
}
@Test
@Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed
public void testCompactUpdatesStats() throws Exception {
testCompactUpdatesStats(0, fullTableName);
}
@Test
@Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed
public void testCompactUpdatesStatsWithMinStatsUpdateFreq() throws Exception {
testCompactUpdatesStats(QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS, fullTableName);
}
private static void invalidateStats(Connection conn, String tableName) throws SQLException {
PTable ptable = conn.unwrap(PhoenixConnection.class)
.getMetaDataCache().getTableRef(new PTableKey(null, tableName))
.getTable();
byte[] name = ptable.getPhysicalName().getBytes();
conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(new GuidePostsKey(name, SchemaUtil.getEmptyColumnFamily(ptable)));
}
private void testCompactUpdatesStats(Integer statsUpdateFreq, String tableName) throws Exception {
int nRows = 10;
Connection conn = getConnection(statsUpdateFreq);
PreparedStatement stmt;
conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) "
+ (!tableDDLOptions.isEmpty() ? tableDDLOptions + "," : "")
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
for (int i = 0; i < nRows; i++) {
stmt.setString(1, Character.toString((char) ('a' + i)));
stmt.setInt(2, i);
stmt.setInt(3, i);
stmt.executeUpdate();
}
conn.commit();
compactTable(conn, physicalTableName);
if (statsUpdateFreq != 0) {
invalidateStats(conn, tableName);
} else {
// Confirm that when we have a non zero STATS_UPDATE_FREQ_MS_ATTRIB, after we run
// UPDATATE STATISTICS, the new statistics are faulted in as expected.
List<KeyRange>keyRanges = getAllSplits(conn, tableName);
assertNotEquals(nRows+1, keyRanges.size());
// If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache
// and forcing the new stats to be pulled over.
int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName);
assertEquals(10, rowCount);
}
List<KeyRange>keyRanges = getAllSplits(conn, tableName);
assertEquals(nRows+1, keyRanges.size());
int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + tableName + " WHERE V < " + nRows / 2);
conn.commit();
assertEquals(5, nDeletedRows);
Scan scan = new Scan();
scan.setRaw(true);
PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) {
ResultScanner scanner = htable.getScanner(scan);
Result result;
while ((result = scanner.next())!=null) {
System.out.println(result);
}
}
compactTable(conn, physicalTableName);
scan = new Scan();
scan.setRaw(true);
phxConn = conn.unwrap(PhoenixConnection.class);
try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) {
ResultScanner scanner = htable.getScanner(scan);
Result result;
while ((result = scanner.next())!=null) {
System.out.println(result);
}
}
if (statsUpdateFreq != 0) {
invalidateStats(conn, tableName);
} else {
assertEquals(nRows+1, keyRanges.size());
// If we've set STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache
// and force us to pull over the new stats
int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName);
assertEquals(5, rowCount);
}
keyRanges = getAllSplits(conn, tableName);
assertEquals(nRows/2+1, keyRanges.size());
ResultSet rs = conn.createStatement().executeQuery("SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM "
+ "\""+ SYSTEM_CATALOG_SCHEMA + "\".\"" + SYSTEM_STATS_TABLE + "\"" + " WHERE PHYSICAL_NAME='" + physicalTableName + "'");
rs.next();
assertEquals(nRows - nDeletedRows, rs.getLong(1));
}
@Test
public void testWithMultiCF() throws Exception {
int nRows = 20;
Connection conn = getConnection(0);
PreparedStatement stmt;
conn.createStatement().execute(
"CREATE TABLE " + fullTableName
+ "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, c.v INTEGER NULL, d.v INTEGER NULL) "
+ tableDDLOptions );
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?, ?, ?, ?)");
byte[] val = new byte[250];
for (int i = 0; i < nRows; i++) {
stmt.setString(1, Character.toString((char)('a' + i)) + Bytes.toString(val));
stmt.setInt(2, i);
stmt.setInt(3, i);
stmt.setInt(4, i);
stmt.setInt(5, i);
stmt.executeUpdate();
}
conn.commit();
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, c.v, d.v) VALUES(?,?,?)");
for (int i = 0; i < 5; i++) {
stmt.setString(1, Character.toString((char)('a' + 'z' + i)) + Bytes.toString(val));
stmt.setInt(2, i);
stmt.setInt(3, i);
stmt.executeUpdate();
}
conn.commit();
ResultSet rs;
TestUtil.analyzeTable(conn, fullTableName);
List<KeyRange> keyRanges = getAllSplits(conn, fullTableName);
assertEquals(26, keyRanges.size());
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
assertEquals("CLIENT 26-CHUNK 25 ROWS " + (columnEncoded ? ( mutable ? "12530" : "13902" ) : "12420") + " BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName,
QueryUtil.getExplainPlan(rs));
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
List<HRegionLocation> regions = services.getAllTableRegions(Bytes.toBytes(physicalTableName));
assertEquals(1, regions.size());
TestUtil.analyzeTable(conn, fullTableName);
String query = "UPDATE STATISTICS " + fullTableName + " SET \""
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(1000);
conn.createStatement().execute(query);
keyRanges = getAllSplits(conn, fullTableName);
boolean oneCellPerColFamliyStorageScheme = !mutable && columnEncoded;
assertEquals(oneCellPerColFamliyStorageScheme ? 13 : 12, keyRanges.size());
rs = conn
.createStatement()
.executeQuery(
"SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH),COUNT(*) from \"SYSTEM\".STATS where PHYSICAL_NAME = '"
+ physicalTableName + "' GROUP BY COLUMN_FAMILY ORDER BY COLUMN_FAMILY");
assertTrue(rs.next());
assertEquals("A", rs.getString(1));
assertEquals(24, rs.getInt(2));
assertEquals(columnEncoded ? ( mutable ? 12252 : 13624 ) : 12144, rs.getInt(3));
assertEquals(oneCellPerColFamliyStorageScheme ? 12 : 11, rs.getInt(4));
assertTrue(rs.next());
assertEquals("B", rs.getString(1));
assertEquals(oneCellPerColFamliyStorageScheme ? 24 : 20, rs.getInt(2));
assertEquals(columnEncoded ? ( mutable ? 5600 : 6972 ) : 5540, rs.getInt(3));
assertEquals(oneCellPerColFamliyStorageScheme ? 6 : 5, rs.getInt(4));
assertTrue(rs.next());
assertEquals("C", rs.getString(1));
assertEquals(24, rs.getInt(2));
assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3));
assertEquals(6, rs.getInt(4));
assertTrue(rs.next());
assertEquals("D", rs.getString(1));
assertEquals(24, rs.getInt(2));
assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3));
assertEquals(6, rs.getInt(4));
assertFalse(rs.next());
// Disable stats
conn.createStatement().execute("ALTER TABLE " + fullTableName +
" SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=0");
TestUtil.analyzeTable(conn, fullTableName);
// Assert that there are no more guideposts
rs = conn.createStatement().executeQuery("SELECT count(1) FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME +
" WHERE " + PhoenixDatabaseMetaData.PHYSICAL_NAME + "='" + physicalTableName + "' AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NOT NULL");
assertTrue(rs.next());
assertEquals(0, rs.getLong(1));
assertFalse(rs.next());
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
assertEquals("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName,
QueryUtil.getExplainPlan(rs));
}
@Test
public void testRowCountAndByteCounts() throws SQLException {
Connection conn = getConnection();
String ddl = "CREATE TABLE " + fullTableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + tableDDLOptions + " split on ('e','j','o')";
conn.createStatement().execute(ddl);
String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r",
"s", "t", "u", "v", "w", "x", "y", "z" };
for (int i = 0; i < 26; i++) {
conn.createStatement().execute(
"UPSERT INTO " + fullTableName + " values('" + strings[i] + "'," + i + "," + (i + 1) + ","
+ (i + 2) + ",'" + strings[25 - i] + "')");
}
conn.commit();
ResultSet rs;
String query = "UPDATE STATISTICS " + fullTableName + " SET \""
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(20);
conn.createStatement().execute(query);
Random r = new Random();
int count = 0;
while (count < 4) {
int startIndex = r.nextInt(strings.length);
int endIndex = r.nextInt(strings.length - startIndex) + startIndex;
long rows = endIndex - startIndex;
long c2Bytes = rows * (columnEncoded ? ( mutable ? 37 : 48 ) : 35);
String physicalTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(fullTableName), userTableNamespaceMapped).toString();
rs = conn.createStatement().executeQuery(
"SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH) from \"SYSTEM\".STATS where PHYSICAL_NAME = '"
+ physicalTableName + "' AND GUIDE_POST_KEY>= cast('" + strings[startIndex]
+ "' as varbinary) AND GUIDE_POST_KEY<cast('" + strings[endIndex]
+ "' as varbinary) and COLUMN_FAMILY='C2' group by COLUMN_FAMILY");
if (startIndex < endIndex) {
assertTrue(rs.next());
assertEquals("C2", rs.getString(1));
assertEquals(rows, rs.getLong(2));
assertEquals(c2Bytes, rs.getLong(3));
count++;
}
}
}
@Test
public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exception {
String tableName = generateUniqueName();
StringBuilder sb = new StringBuilder(200);
sb.append("CREATE TABLE " + tableName + "(PK1 VARCHAR NOT NULL, ");
int numRows = 10;
try (Connection conn = DriverManager.getConnection(getUrl())) {
int compactionScannerKVThreshold =
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
int numKvColumns = compactionScannerKVThreshold * 2;
for (int i = 1; i <= numKvColumns; i++) {
sb.append("KV" + i + " VARCHAR");
if (i < numKvColumns) {
sb.append(", ");
}
}
sb.append(" CONSTRAINT PK PRIMARY KEY (PK1))");
String ddl = sb.toString();
conn.createStatement().execute(ddl);
sb = new StringBuilder(200);
sb.append("UPSERT INTO " + tableName + " VALUES (");
for (int i = 1; i <= numKvColumns + 1; i++) {
sb.append("?");
if (i < numKvColumns + 1) {
sb.append(", ");
}
}
sb.append(")");
String dml = sb.toString();
PreparedStatement stmt = conn.prepareStatement(dml);
String keyValue = "KVVVVVV";
for (int j = 1; j <= numRows; j++) {
for (int i = 1; i <= numKvColumns + 1; i++) {
if (i == 1) {
stmt.setString(1, "" + j);
} else {
stmt.setString(i, keyValue);
}
}
stmt.executeUpdate();
}
conn.commit();
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
String q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'";
ResultSet rs = conn.createStatement().executeQuery(q);
rs.next();
assertEquals("Number of expected rows in stats table after update stats didn't match!", numRows, rs.getInt(1));
conn.createStatement().executeUpdate("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'");
conn.commit();
TestUtil.doMajorCompaction(conn, tableName);
q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'";
rs = conn.createStatement().executeQuery(q);
rs.next();
assertEquals("Number of expected rows in stats table after major compaction didn't match", numRows, rs.getInt(1));
}
}
@Test
public void testEmptyGuidePostGeneratedWhenDataSizeLessThanGPWidth() throws Exception {
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl())) {
long guidePostWidth = 20000000;
conn.createStatement()
.execute("CREATE TABLE " + tableName
+ " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH="
+ guidePostWidth + ", SALT_BUCKETS = 4");
conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
conn.commit();
conn.createStatement().execute("UPDATE STATISTICS " + tableName);
ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
try (HTableInterface statsHTable =
queryServices.getTable(
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
queryServices.getProps()).getName())) {
GuidePostsInfo gps =
StatisticsUtil.readStatistics(statsHTable,
new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C1")),
HConstants.LATEST_TIMESTAMP);
assertTrue(gps.isEmptyGuidePost());
assertEquals(guidePostWidth, gps.getByteCounts()[0]);
assertTrue(gps.getGuidePostTimestamps()[0] > 0);
gps =
StatisticsUtil.readStatistics(statsHTable,
new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C2")),
HConstants.LATEST_TIMESTAMP);
assertTrue(gps.isEmptyGuidePost());
assertEquals(guidePostWidth, gps.getByteCounts()[0]);
assertTrue(gps.getGuidePostTimestamps()[0] > 0);
}
}
}
@Test
public void testGuidePostWidthUsedInDefaultStatsCollector() throws Exception {
String baseTable = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl())) {
String ddl =
"CREATE TABLE " + baseTable
+ " (k INTEGER PRIMARY KEY, a bigint, b bigint, c bigint) "
+ tableDDLOptions;
BaseTest.createTestTable(getUrl(), ddl, null, null);
conn.createStatement().execute("upsert into " + baseTable + " values (100,1,1,1)");
conn.createStatement().execute("upsert into " + baseTable + " values (101,2,2,2)");
conn.createStatement().execute("upsert into " + baseTable + " values (102,3,3,3)");
conn.createStatement().execute("upsert into " + baseTable + " values (103,4,4,4)");
conn.createStatement().execute("upsert into " + baseTable + " values (104,5,5,5)");
conn.createStatement().execute("upsert into " + baseTable + " values (105,6,6,6)");
conn.createStatement().execute("upsert into " + baseTable + " values (106,7,7,7)");
conn.createStatement().execute("upsert into " + baseTable + " values (107,8,8,8)");
conn.createStatement().execute("upsert into " + baseTable + " values (108,9,9,9)");
conn.createStatement().execute("upsert into " + baseTable + " values (109,10,10,10)");
conn.commit();
DefaultStatisticsCollector statsCollector = getDefaultStatsCollectorForTable(baseTable);
statsCollector.init();
assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth());
// ok let's create a global index now and see what guide post width is used for it
String globalIndex = "GI_" + generateUniqueName();
ddl = "CREATE INDEX " + globalIndex + " ON " + baseTable + " (a) INCLUDE (b) ";
conn.createStatement().execute(ddl);
statsCollector = getDefaultStatsCollectorForTable(globalIndex);
statsCollector.init();
assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth());
// let's check out local index too
String localIndex = "LI_" + generateUniqueName();
ddl = "CREATE LOCAL INDEX " + localIndex + " ON " + baseTable + " (b) INCLUDE (c) ";
conn.createStatement().execute(ddl);
// local indexes reside on the same table as base data table
statsCollector = getDefaultStatsCollectorForTable(baseTable);
statsCollector.init();
assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth());
// now let's create a view and an index on it and see what guide post width is used for
// it
String view = "V_" + generateUniqueName();
ddl = "CREATE VIEW " + view + " AS SELECT * FROM " + baseTable;
conn.createStatement().execute(ddl);
String viewIndex = "VI_" + generateUniqueName();
ddl = "CREATE INDEX " + viewIndex + " ON " + view + " (b)";
conn.createStatement().execute(ddl);
String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
statsCollector = getDefaultStatsCollectorForTable(viewIndexTableName);
statsCollector.init();
assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth());
/*
* Fantastic! Now let's change the guide post width of the base table. This should
* change the guide post width we are using in DefaultStatisticsCollector for all
* indexes too.
*/
long newGpWidth = 500;
conn.createStatement()
.execute("ALTER TABLE " + baseTable + " SET GUIDE_POSTS_WIDTH=" + newGpWidth);
// base table
statsCollector = getDefaultStatsCollectorForTable(baseTable);
statsCollector.init();
assertEquals(newGpWidth, statsCollector.getGuidePostDepth());
// global index table
statsCollector = getDefaultStatsCollectorForTable(globalIndex);
statsCollector.init();
assertEquals(newGpWidth, statsCollector.getGuidePostDepth());
// view index table
statsCollector = getDefaultStatsCollectorForTable(viewIndexTableName);
statsCollector.init();
assertEquals(newGpWidth, statsCollector.getGuidePostDepth());
}
}
private DefaultStatisticsCollector getDefaultStatsCollectorForTable(String tableName)
throws Exception {
RegionCoprocessorEnvironment env = getRegionEnvrionment(tableName);
return (DefaultStatisticsCollector) StatisticsCollectorFactory
.createStatisticsCollector(env, tableName, System.currentTimeMillis(), null, null);
}
private RegionCoprocessorEnvironment getRegionEnvrionment(String tableName)
throws IOException, InterruptedException {
return (RegionCoprocessorEnvironment) getUtility()
.getRSForFirstRegionInTable(TableName.valueOf(tableName))
.getOnlineRegions(TableName.valueOf(tableName)).get(0).getCoprocessorHost()
.findCoprocessorEnvironment(UngroupedAggregateRegionObserver.class.getName());
}
}