blob: 0071392c1e7845fc5ee4d9216cf188dfd5100bbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.primitives.Doubles;
@RunWith(Parameterized.class)
public class MutableIndexIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER = LoggerFactory.getLogger(MutableIndexIT.class);
protected final boolean localIndex;
private final String tableDDLOptions;
public MutableIndexIT(Boolean localIndex, String txProvider, Boolean columnEncoded) {
this.localIndex = localIndex;
StringBuilder optionBuilder = new StringBuilder();
if (txProvider != null) {
optionBuilder.append("TRANSACTIONAL=true," + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + "='" + txProvider + "'");
}
if (!columnEncoded) {
if (optionBuilder.length()!=0)
optionBuilder.append(",");
optionBuilder.append("COLUMN_ENCODED_BYTES=0");
}
this.tableDDLOptions = optionBuilder.toString();
}
private static Connection getConnection(Properties props) throws SQLException {
props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(1));
Connection conn = DriverManager.getConnection(getUrl(), props);
return conn;
}
private static Connection getConnection() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
return getConnection(props);
}
@Parameters(name="MutableIndexIT_localIndex={0},transactionProvider={1},columnEncoded={2}") // name is used by failsafe as file name in reports
public static synchronized Collection<Object[]> data() {
return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {
{ false, null, false }, { false, null, true },
{ false, "TEPHRA", false }, { false, "TEPHRA", true },
{ false, "OMID", false },
{ true, null, false }, { true, null, true },
{ true, "TEPHRA", false }, { true, "TEPHRA", true },
}),1);
}
@Test
public void testCoveredColumnUpdates() throws Exception {
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
TestUtil.createMultiCFTestTable(conn, fullTableName, tableDDLOptions);
populateMultiCFTestTable(fullTableName);
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName
+ " (char_col1 ASC, int_col1 ASC) INCLUDE (long_col1, long_col2)");
String query = "SELECT char_col1, int_col1, long_col2 from " + fullTableName;
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if (localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName +" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
}
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertEquals(3L, rs.getLong(3));
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(3, rs.getInt(2));
assertEquals(4L, rs.getLong(3));
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(4, rs.getInt(2));
assertEquals(5L, rs.getLong(3));
assertFalse(rs.next());
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+ "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM "
+ fullTableName + " WHERE long_col2=?");
stmt.setLong(1,4L);
assertEquals(1,stmt.executeUpdate());
conn.commit();
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertEquals(3L, rs.getLong(3));
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(3, rs.getInt(2));
assertEquals(8L, rs.getLong(3));
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(4, rs.getInt(2));
assertEquals(5L, rs.getLong(3));
assertFalse(rs.next());
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+ "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, CAST(null AS BIGINT) FROM "
+ fullTableName + " WHERE long_col2=?");
stmt.setLong(1,3L);
assertEquals(1,stmt.executeUpdate());
conn.commit();
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertEquals(0, rs.getLong(3));
assertTrue(rs.wasNull());
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(3, rs.getInt(2));
assertEquals(8L, rs.getLong(3));
assertTrue(rs.next());
assertEquals("chara", rs.getString(1));
assertEquals(4, rs.getInt(2));
assertEquals(5L, rs.getLong(3));
assertFalse(rs.next());
if(localIndex) {
query = "SELECT b.* from " + fullTableName + " where int_col1 = 4 AND char_col1 = 'chara'";
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName +" [1,'chara',4]\n" +
"CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("varchar_b", rs.getString(1));
assertEquals("charb", rs.getString(2));
assertEquals(5, rs.getInt(3));
assertEquals(5, rs.getLong(4));
assertFalse(rs.next());
}
}
}
@Test
public void testUpsertIntoViewOnTableWithIndex() throws Exception {
String baseTable = generateUniqueName();
String view = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl())) {
String baseTableDDL = "CREATE TABLE IF NOT EXISTS " + baseTable +
" (ID VARCHAR PRIMARY KEY, V1 VARCHAR)";
conn.createStatement().execute(baseTableDDL);
// Create an Index on the base table
String tableIndex = generateUniqueName() + "_IDX";
conn.createStatement().execute("CREATE INDEX " + tableIndex +
" ON " + baseTable + " (V1)");
// Create a view on the base table
String viewDDL = "CREATE VIEW IF NOT EXISTS " + view
+ " (V2 INTEGER) AS SELECT * FROM " + baseTable
+ " WHERE ID='a'";
conn.createStatement().execute(viewDDL);
String upsert = "UPSERT INTO " + view + " (ID, V1, V2) "
+ "VALUES ('a' ,'ab', 7)";
conn.createStatement().executeUpdate(upsert);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT ID, V1 from " + baseTable);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("ab", rs.getString(2));
}
}
@Test
public void testCoveredColumns() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
String query;
ResultSet rs;
conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1,"a");
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x",rs.getString(1));
assertEquals("a",rs.getString(2));
assertEquals("1",rs.getString(3));
assertFalse(rs.next());
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
stmt.setString(1,"a");
stmt.setString(2, null);
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x",rs.getString(1));
assertEquals("a",rs.getString(2));
assertNull(rs.getString(3));
assertFalse(rs.next());
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if(localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
}
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("x",rs.getString(2));
assertNull(rs.getString(3));
assertFalse(rs.next());
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
stmt.setString(1,"a");
stmt.setString(2,"3");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if(localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName + " [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
}
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("x",rs.getString(2));
assertEquals("3",rs.getString(3));
assertFalse(rs.next());
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
stmt.setString(1,"a");
stmt.setString(2,"4");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if(localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
}
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("x",rs.getString(2));
assertEquals("4",rs.getString(3));
assertFalse(rs.next());
}
}
@Test
public void testCompoundIndexKey() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
String query;
ResultSet rs;
// make sure that the tables are empty, but reachable
conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// load some data into the table
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1,"a");
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x",rs.getString(1));
assertEquals("1",rs.getString(2));
assertEquals("a",rs.getString(3));
assertFalse(rs.next());
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1,"a");
stmt.setString(2, "y");
stmt.setString(3, null);
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("y",rs.getString(1));
assertNull(rs.getString(2));
assertEquals("a",rs.getString(3));
assertFalse(rs.next());
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if (localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n"
+ "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+ " SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
}
//make sure the data table looks like what we expect
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("y",rs.getString(2));
assertNull(rs.getString(3));
assertFalse(rs.next());
// Upsert new row with null leading index column
stmt.setString(1,"b");
stmt.setString(2, null);
stmt.setString(3, "3");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(null,rs.getString(1));
assertEquals("3",rs.getString(2));
assertEquals("b",rs.getString(3));
assertTrue(rs.next());
assertEquals("y",rs.getString(1));
assertNull(rs.getString(2));
assertEquals("a",rs.getString(3));
assertFalse(rs.next());
// Update row with null leading index column to have a value
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?)");
stmt.setString(1,"b");
stmt.setString(2, "z");
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("y",rs.getString(1));
assertNull(rs.getString(2));
assertEquals("a",rs.getString(3));
assertTrue(rs.next());
assertEquals("z",rs.getString(1));
assertEquals("3",rs.getString(2));
assertEquals("b",rs.getString(3));
assertFalse(rs.next());
}
}
/**
* There was a case where if there were multiple updates to a single row in the same batch, the
* index wouldn't be updated correctly as each element of the batch was evaluated with the state
* previous to the batch, rather than with the rest of the batch. This meant you could do a put
* and a delete on a row in the same batch and the index result would contain the current + put
* and current + delete, but not current + put + delete.
* @throws Exception on failure
*/
@Test
public void testMultipleUpdatesToSingleRow() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
String query;
ResultSet rs;
// make sure that the tables are empty, but reachable
conn.createStatement().execute(
"CREATE TABLE " + fullTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// load some data into the table
PreparedStatement stmt =
conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a");
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
conn.commit();
// make sure the index is working as expected
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x", rs.getString(1));
assertEquals("1", rs.getString(2));
assertEquals("a", rs.getString(3));
assertFalse(rs.next());
// do multiple updates to the same row, in the same batch
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)");
stmt.setString(1, "a");
stmt.setString(2, "y");
stmt.execute();
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
stmt.setString(1, "a");
stmt.setString(2, null);
stmt.execute();
conn.commit();
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("y", rs.getString(1));
assertNull(rs.getString(2));
assertEquals("a", rs.getString(3));
assertFalse(rs.next());
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
if(localIndex) {
assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullTableName+" [1]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n"
+ "CLIENT MERGE SORT",
QueryUtil.getExplainPlan(rs));
} else {
assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+ " SERVER FILTER BY FIRST KEY ONLY",
QueryUtil.getExplainPlan(rs));
}
// check that the data table matches as expected
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("y", rs.getString(2));
assertNull(rs.getString(3));
assertFalse(rs.next());
}
}
@Test
public void testUpsertingNullForIndexedColumns() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
String testTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName + "_" + System.currentTimeMillis());
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
ResultSet rs;
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + testTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions);
stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + testTableName + " (v2) INCLUDE(v3)");
//create a row with value null for indexed column v2
stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')");
conn.commit();
//assert values in index table
rs = stmt.executeQuery("select * from " + fullIndexName);
assertTrue(rs.next());
assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
assertTrue(rs.wasNull());
assertEquals("cc1", rs.getString(2));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
//assert values in data table
rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName);
assertTrue(rs.next());
assertEquals("cc1", rs.getString(1));
assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
assertTrue(rs.wasNull());
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
//update the previously null value for indexed column v2 to a non-null value 1.23
stmt.executeUpdate("upsert into " + testTableName + " values('cc1', 1.23, 'abc')");
conn.commit();
//assert values in data table
rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + testTableName);
assertTrue(rs.next());
assertEquals("cc1", rs.getString(1));
assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
//assert values in index table
rs = stmt.executeQuery("select * from " + fullIndexName);
assertTrue(rs.next());
assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
assertEquals("cc1", rs.getString(2));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
//update the value for indexed column v2 back to null
stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')");
conn.commit();
//assert values in index table
rs = stmt.executeQuery("select * from " + fullIndexName);
assertTrue(rs.next());
assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
assertTrue(rs.wasNull());
assertEquals("cc1", rs.getString(2));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
//assert values in data table
rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName);
assertTrue(rs.next());
assertEquals("cc1", rs.getString(1));
assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
}
}
private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
}
@Test
public void testAlterTableWithImmutability() throws Exception {
if (localIndex)
return;
String query;
ResultSet rs;
String tableName = "TBL_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
assertImmutableRows(conn,fullTableName, false);
conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET IMMUTABLE_ROWS=true");
assertImmutableRows(conn,fullTableName, true);
conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET immutable_rows=false");
assertImmutableRows(conn,fullTableName, false);
}
}
private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException {
String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
"k1 INTEGER NOT NULL,\n" +
"k2 INTEGER NOT NULL,\n" +
"k3 INTEGER,\n" +
"v1 VARCHAR,\n" +
"CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+ (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : "");
conn.createStatement().execute(ddl);
}
@Test
public void testTenantSpecificConnection() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
// create data table
conn.createStatement().execute(
"CREATE TABLE IF NOT EXISTS " + fullTableName +
"(TENANT_ID CHAR(15) NOT NULL,"+
"TYPE VARCHAR(25),"+
"ENTITY_ID CHAR(15) NOT NULL,"+
"CONSTRAINT PK_CONSTRAINT PRIMARY KEY (TENANT_ID, ENTITY_ID)) MULTI_TENANT=TRUE "
+ (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "") );
// create index
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName + " (ENTITY_ID, TYPE)");
// upsert rows
String dml = "UPSERT INTO " + fullTableName + " (ENTITY_ID, TYPE) VALUES ( ?, ?)";
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "tenant1");
// connection is tenant-specific
try (Connection tenantConn = getConnection(props)) {
// upsert one row
upsertRow(dml, tenantConn, 0);
tenantConn.commit();
ResultSet rs = tenantConn.createStatement().executeQuery("SELECT ENTITY_ID FROM " + fullTableName + " ORDER BY TYPE LIMIT 5");
assertTrue(rs.next());
// upsert two rows which ends up using the tenant cache
upsertRow(dml, tenantConn, 1);
upsertRow(dml, tenantConn, 2);
tenantConn.commit();
}
}
}
@Test
public void testUpsertingDeletedRowShouldGiveProperDataWithIndexes() throws Exception {
testUpsertingDeletedRowShouldGiveProperDataWithIndexes(false);
}
@Test
public void testUpsertingDeletedRowShouldGiveProperDataWithMultiCFIndexes() throws Exception {
testUpsertingDeletedRowShouldGiveProperDataWithIndexes(true);
}
private void testUpsertingDeletedRowShouldGiveProperDataWithIndexes(boolean multiCf) throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String columnFamily1 = "cf1";
String columnFamily2 = "cf2";
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.createStatement().execute(
"create table " + fullTableName + " (id integer primary key, "
+ (multiCf ? columnFamily1 + "." : "") + "f float, "
+ (multiCf ? columnFamily2 + "." : "") + "s varchar)" + tableDDLOptions);
conn.createStatement().execute(
"create " + (localIndex ? "LOCAL" : "") + " index " + indexName + " on " + fullTableName + " ("
+ (multiCf ? columnFamily1 + "." : "") + "f) include ("+(multiCf ? columnFamily2 + "." : "") +"s)");
conn.createStatement().execute(
"upsert into " + fullTableName + " values (1, 0.5, 'foo')");
conn.commit();
conn.createStatement().execute("delete from " + fullTableName + " where id = 1");
conn.commit();
conn.createStatement().execute(
"upsert into " + fullTableName + " values (1, 0.5, 'foo')");
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("select * from "+fullIndexName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(0.5F, rs.getFloat(1), 0.0);
assertEquals("foo", rs.getString(3));
}
}
@Test
public void testUpsertingDeletedRowWithNullCoveredColumn() throws Exception {
testUpsertingDeletedRowWithNullCoveredColumn(false);
}
@Test
public void testUpsertingDeletedRowWithNullCoveredColumnMultiCfs() throws Exception {
testUpsertingDeletedRowWithNullCoveredColumn(true);
}
public void testUpsertingDeletedRowWithNullCoveredColumn(boolean multiCf) throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String columnFamily1 = "cf1";
String columnFamily2 = "cf2";
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.createStatement()
.execute("create table " + fullTableName + " (id integer primary key, "
+ (multiCf ? columnFamily1 + "." : "") + "f varchar, "
+ (multiCf ? columnFamily2 + "." : "") + "s varchar)"
+ tableDDLOptions);
conn.createStatement()
.execute("create " + (localIndex ? "LOCAL" : "") + " index " + indexName
+ " on " + fullTableName + " (" + (multiCf ? columnFamily1 + "." : "")
+ "f) include (" + (multiCf ? columnFamily2 + "." : "") + "s)");
conn.createStatement()
.execute("upsert into " + fullTableName + " values (1, 'foo', 'bar')");
conn.commit();
conn.createStatement().execute("delete from " + fullTableName + " where id = 1");
conn.commit();
conn.createStatement()
.execute("upsert into " + fullTableName + " values (1, null, 'bar')");
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("select * from " + fullIndexName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(null, rs.getString(1));
assertEquals("bar", rs.getString(3));
}
}
/**
* PHOENIX-4988
* Test updating only a non-indexed column after two successive deletes to an indexed row
*/
@Test
public void testUpdateNonIndexedColumn() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
try (Connection conn = getConnection()) {
conn.setAutoCommit(false);
conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v2)");
conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_1','v2_1')");
conn.commit();
conn.createStatement().executeUpdate("DELETE FROM " + fullTableName);
conn.commit();
conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_2','v2_2')");
conn.commit();
conn.createStatement().executeUpdate("DELETE FROM " + fullTableName);
conn.commit();
conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1) VALUES ('testKey','v1_3')");
conn.commit();
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
// PHOENIX-4980
// When there is a flush after a data table update of non-indexed columns, the
// index gets out of sync on the next write
getUtility().getHBaseAdmin().flush(TableName.valueOf(fullTableName));
conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName + "(k,v1,v2) VALUES ('testKey','v1_4','v2_3')");
conn.commit();
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
}
private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException {
PreparedStatement stmt = tenantConn.prepareStatement(dml);
stmt.setString(1, "00000000000000" + String.valueOf(i));
stmt.setString(2, String.valueOf(i));
assertEquals(1,stmt.executeUpdate());
}
}