blob: 1d6763a484d21f41aad97bda0f1b3d405b52ebb9 [file] [log] [blame]
package org.apache.phoenix.end2end.index;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class SingleCellIndexIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER = LoggerFactory.getLogger(SingleCellIndexIT.class);
private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
private boolean mutable;
private final String tableDDLOptions;
@Parameterized.Parameters(name = "mutable={0}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true},
{false} });
}
@Before
public void setupBefore() {
testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
}
public SingleCellIndexIT(boolean mutable) {
StringBuilder optionBuilder = new StringBuilder();
this.mutable = mutable;
optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN ");
if (!mutable) {
optionBuilder.append(", IMMUTABLE_ROWS=true ");
}
this.tableDDLOptions = optionBuilder.toString();
}
@Test
public void testCreateOneCellTableAndSingleCellIndex() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String tableName = "TBL_" + generateUniqueName();
String idxName = "IND_" + generateUniqueName();
createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, 3);
assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
dumpTable(tableName);
String selectFromData = "SELECT /*+ NO_INDEX */ PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 3 and V4 LIKE 'V4%'";
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromData);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(actualExplainPlan.contains(tableName));
rs = conn.createStatement().executeQuery(selectFromData);
assertTrue(rs.next());
assertEquals("PK2", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertEquals("V12", rs.getString(3));
assertEquals(3, rs.getInt(4));
assertEquals("V42", rs.getString(5));
assertTrue(rs.next());
String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 3 and V4 LIKE 'V4%'";
rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(actualExplainPlan.contains(idxName));
rs = conn.createStatement().executeQuery(selectFromIndex);
assertTrue(rs.next());
assertEquals("PK2", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertEquals("V12", rs.getString(3));
assertEquals(3, rs.getInt(4));
assertEquals("V42", rs.getString(5));
assertTrue(rs.next());
assertEquals("PK3", rs.getString(1));
assertEquals(3, rs.getInt(2));
assertEquals("V13", rs.getString(3));
assertEquals(4, rs.getInt(4));
assertEquals("V43", rs.getString(5));
assertFalse(rs.next());
}
}
@Test
public void testAddColumns() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String tableName = "TBL_" + generateUniqueName();
String idxName = "IND_" + generateUniqueName();
createTableAndIndex(conn, tableName, idxName, null, 3);
assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
String alterTable = "ALTER TABLE " + tableName + " ADD V_NEW VARCHAR CASCADE INDEX ALL";
conn.createStatement().execute(alterTable);
String upsert = "UPSERT INTO " + tableName + " (PK1,INT_PK,V1, V2, V3, V4, V5, V_NEW) VALUES ('PK99',99,'V199',100,101,'V499','V699','V_NEW99')";
conn.createStatement().executeUpdate(upsert);
conn.commit();
String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4, V_NEW FROM " + tableName + " where V1='V199' AND V2=100";
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(actualExplainPlan.contains(idxName));
rs = conn.createStatement().executeQuery(selectFromIndex);
assertTrue(rs.next());
assertEquals("PK99", rs.getString(1));
assertEquals(99, rs.getInt(2));
assertEquals("V199", rs.getString(3));
assertEquals(100, rs.getInt(4));
assertEquals("V499", rs.getString(5));
assertEquals("V_NEW99", rs.getString(6));
assertFalse(rs.next());
}
}
@Test
public void testDropColumns() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String tableName = "TBL_" + generateUniqueName();
String idxName = "IND_" + generateUniqueName();
createTableAndIndex(conn, tableName, idxName, null, 1);
assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
String alterTable = "ALTER TABLE " + tableName + " DROP COLUMN V2";
conn.createStatement().execute(alterTable);
String star = "SELECT * FROM " + idxName ;
ResultSet rs = conn.createStatement().executeQuery(star);
assertTrue(rs.next());
assertEquals("PK1", rs.getString(1));
assertEquals(1, rs.getInt(2));
assertEquals("V11", rs.getString(3));
assertEquals("V41", rs.getString(4));
assertFalse(rs.next());
String selectFromIndex = "SELECT PK1, INT_PK, V1, V4 FROM " + tableName + " where V1='V11'";
rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(actualExplainPlan.contains(idxName));
rs = conn.createStatement().executeQuery(selectFromIndex);
assertTrue(rs.next());
assertEquals("PK1", rs.getString(1));
assertEquals(1, rs.getInt(2));
assertEquals("V11", rs.getString(3));
assertEquals("V41", rs.getString(4));
assertFalse(rs.next());
}
}
@Test
public void testTenantViewIndexes() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String tableName = "TBL_" + generateUniqueName();
String view1IndexName = "IND_" + generateUniqueName();
String divergedViewIndex = "DIND_" + generateUniqueName();
String view3IndexName = "IND_" + generateUniqueName();
String view1 = "V_" + generateUniqueName();
String divergedView = "V_" + generateUniqueName();
String view3 = "V_" + generateUniqueName();
String baseTableDDL = "CREATE TABLE " + tableName + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR, V3 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true"
+ (this.mutable ? "": ", IMMUTABLE_ROWS=true");
conn.createStatement().execute(baseTableDDL);
assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
try (Connection tenant1Conn = getTenantConnection("tenant1")) {
String
view1DDL =
"CREATE VIEW " + view1 + " ( VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+ tableName;
tenant1Conn.createStatement().execute(view1DDL);
String indexDDL = "CREATE INDEX " + view1IndexName + " ON " + view1 + " (V1) include (VIEW_COL2, V3) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2";
tenant1Conn.createStatement().execute(indexDDL);
tenant1Conn.commit();
}
try (Connection tenant2Conn = getTenantConnection("tenant2")) {
String
divergedViewDDL =
"CREATE VIEW " + divergedView + " ( VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+ tableName ;
tenant2Conn.createStatement().execute(divergedViewDDL);
// Drop column V2 from the view to have it diverge from the base table
tenant2Conn.createStatement().execute("ALTER VIEW " + divergedView + " DROP COLUMN V2");
// create an index on the diverged view
String indexDDL = "CREATE INDEX " + divergedViewIndex + " ON " + divergedView + " (V1) include (VIEW_COL1, V3) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS";
tenant2Conn.createStatement().execute(indexDDL);
tenant2Conn.commit();
}
try (Connection tenant3Conn = getTenantConnection("tenant3")) {
String
view3DDL =
"CREATE VIEW " + view3 + " ( VIEW_COL31 VARCHAR, VIEW_COL32 VARCHAR) AS SELECT * FROM "
+ tableName;
tenant3Conn.createStatement().execute(view3DDL);
String indexDDL = "CREATE INDEX " + view3IndexName + " ON " + view3 + " (V1) include (VIEW_COL32, V3)";
tenant3Conn.createStatement().execute(indexDDL);
tenant3Conn.commit();
}
String upsert = "UPSERT INTO " + view1 + " (PK1, V1, V2, V3, VIEW_COL1, VIEW_COL2) VALUES ('PK1', 'V1', 'V2', 'V3','VIEW_COL1_1','VIEW_COL2_1')";
try (Connection viewConn = getTenantConnection("tenant1")) {
viewConn.createStatement().executeUpdate(upsert);
viewConn.commit();
Statement stmt = viewConn.createStatement();
String sql = "SELECT V3, VIEW_COL2 FROM " + view1 + " WHERE V1 = 'V1'";
QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(view1IndexName)));
ResultSet rs = viewConn.createStatement().executeQuery(sql);
assertTrue(rs.next());
assertEquals("V3", rs.getString(1));
assertEquals("VIEW_COL2_1", rs.getString(2));
}
// Upsert records in diverged view. Verify that the PK column was added to the index on it.
upsert = "UPSERT INTO " + divergedView + " (PK1, V1, V3, VIEW_COL1, VIEW_COL2) VALUES ('PK1', 'V1', 'V3','VIEW_COL21_1','VIEW_COL22_1')";
try (Connection viewConn = getTenantConnection("tenant2")) {
viewConn.createStatement().executeUpdate(upsert);
viewConn.commit();
Statement stmt = viewConn.createStatement();
String sql = "SELECT V3, VIEW_COL1 FROM " + divergedView + " WHERE V1 = 'V1'";
QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(divergedViewIndex)));
ResultSet rs = viewConn.createStatement().executeQuery(sql);
assertTrue(rs.next());
assertEquals("V3", rs.getString(1));
assertEquals("VIEW_COL21_1", rs.getString(2));
}
upsert = "UPSERT INTO " + view3 + " (PK1, V1, V2, V3, VIEW_COL31, VIEW_COL32) VALUES ('PK1', 'V1', 'V2', 'V3','VIEW_COL31_1','VIEW_COL32_1')";
try (Connection viewConn = getTenantConnection("tenant3")) {
viewConn.createStatement().executeUpdate(upsert);
viewConn.commit();
Statement stmt = viewConn.createStatement();
String sql = "SELECT V3, VIEW_COL32 FROM " + view3 + " WHERE V1 = 'V1'";
QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(view3IndexName)));
ResultSet rs = viewConn.createStatement().executeQuery(sql);
assertTrue(rs.next());
assertEquals("V3", rs.getString(1));
assertEquals("VIEW_COL32_1", rs.getString(2));
}
dumpTable("_IDX_" + tableName);
}
}
private Connection getTenantConnection(String tenantId) throws Exception {
Properties tenantProps = PropertiesUtil.deepCopy(testProps);
tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
return DriverManager.getConnection(getUrl(), tenantProps);
}
private void assertMetadata(Connection conn, PTable.ImmutableStorageScheme expectedStorageScheme, PTable.QualifierEncodingScheme
expectedColumnEncoding, String tableName)
throws Exception {
PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName));
assertEquals(expectedStorageScheme, table.getImmutableStorageScheme());
assertEquals(expectedColumnEncoding, table.getEncodingScheme());
}
private void createTableAndIndex(Connection conn, String tableName, String indexName, String tableDDL, int numOfRows)
throws SQLException {
String createTableSql = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, V1 VARCHAR, V2 INTEGER, V3 INTEGER, V4 VARCHAR, V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) "
+ (tableDDL == null ? "" : tableDDL);
LOGGER.debug(createTableSql);
conn.createStatement().execute(createTableSql);
String createIndexSql = "CREATE INDEX " + indexName + " ON " + tableName + " (PK1, INT_PK) include (V1,V2,V4) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2";
conn.createStatement().execute(createIndexSql);
String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V1, V2, V3, V4, V5) VALUES (?,?,?,?,?,?,?)";
PreparedStatement upsertStmt = conn.prepareStatement(upsert);
for (int i=1; i <= numOfRows; i++) {
upsertStmt.setString(1, "PK"+i);
upsertStmt.setInt(2, i);
upsertStmt.setString(3, "V1"+i);
upsertStmt.setInt(4, i+1);
upsertStmt.setInt(5, i+2);
upsertStmt.setString(6, "V4"+i);
upsertStmt.setString(7, "V5"+i);
upsertStmt.executeUpdate();
}
}
public static void dumpTable(String tableName) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
Table
hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
Scan scan = new Scan();
scan.setRaw(true);
LOGGER.debug("***** Table Name : " + tableName);
ResultScanner scanner = hTable.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
String cellString = cell.toString();
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
.entrySet()) {
byte[] family = entryF.getKey();
}
LOGGER.debug(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
}
}
}
}
}