blob: 30cae3614f80a54373deddc40255b973c5ea3344 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Tests for the {@link IndexTool}
*/
@Category(NeedsOwnMiniClusterTest.class)
public class IndexToolIT {
private static HBaseTestingUtility hbaseTestUtil;
private static String zkQuorum;
@BeforeClass
public static void setUp() throws Exception {
hbaseTestUtil = new HBaseTestingUtility();
Configuration conf = hbaseTestUtil.getConfiguration();
conf.setBoolean("hbase.defaults.for.version.skip", true);
setUpConfigForMiniCluster(conf);
hbaseTestUtil.startMiniCluster();
hbaseTestUtil.startMiniMapReduceCluster();
Class.forName(PhoenixDriver.class.getName());
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
}
@Test
public void testImmutableGlobalIndex() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
}
@Test
public void testImmutableLocalIndex() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
}
@Test
public void testMutableGlobalIndex() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
}
@Test
public void testMutableLocalIndex() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
}
@Test
public void testImmutableGlobalIndexDirectApi() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
}
@Test
public void testImmutableLocalIndexDirectApi() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
}
@Test
public void testMutableGlobalIndexDirectApi() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
}
@Test
public void testMutableLocalIndexDirectApi() throws Exception {
testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
}
public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
}
public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
final String indxTable = String.format("%s_%s",dataTable,"INDX");
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
Statement stmt = conn.createStatement();
try {
stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
int id = 1;
// insert two rows
upsertRow(stmt1, id++);
upsertRow(stmt1, id++);
conn.commit();
stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
//verify rows are fetched from data table.
String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName);
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
//assert we are pulling from data table.
assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan);
rs = stmt1.executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("xxUNAME1_xyz", rs.getString(1));
assertTrue(rs.next());
assertEquals("xxUNAME2_xyz", rs.getString(1));
//run the index MR job.
final IndexTool indexingTool = new IndexTool();
indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
int status = indexingTool.run(cmdArgs);
assertEquals(0, status);
// insert two more rows
upsertRow(stmt1, 3);
upsertRow(stmt1, 4);
conn.commit();
rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
//assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
rs = stmt.executeQuery(selectSql);
// assertTrue(rs.next());
// assertEquals("xxUNAME1_xyz", rs.getString(1));
// assertEquals(1, rs.getInt(2));
//
// assertTrue(rs.next());
// assertEquals("xxUNAME2_xyz", rs.getString(1));
// assertEquals(2, rs.getInt(2));
//
// assertTrue(rs.next());
// assertEquals("xxUNAME3_xyz", rs.getString(1));
// assertEquals(3, rs.getInt(2));
//
// assertTrue(rs.next());
// assertEquals("xxUNAME4_xyz", rs.getString(1));
// assertEquals(4, rs.getInt(2));
//
// assertFalse(rs.next());
conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , fullTableName));
} finally {
conn.close();
}
}
/**
* This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
* the MR job runs, do show up in the index table .
* @throws Exception
*/
@Test
public void testMutalbleIndexWithUpdates() throws Exception {
final String dataTable = "DATA_TABLE5";
final String indxTable = String.format("%s_%s",dataTable,"INDX");
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
Statement stmt = conn.createStatement();
try {
stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
int id = 1;
// insert two rows
upsertRow(stmt1, id++);
upsertRow(stmt1, id++);
conn.commit();
stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
//update a row
stmt1.setInt(1, 1);
stmt1.setString(2, "uname" + String.valueOf(10));
stmt1.setInt(3, 95050 + 1);
stmt1.executeUpdate();
conn.commit();
//verify rows are fetched from data table.
String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
//assert we are pulling from data table.
assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
rs = stmt1.executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("UNAME10", rs.getString(1));
assertTrue(rs.next());
assertEquals("UNAME2", rs.getString(1));
//run the index MR job.
final IndexTool indexingTool = new IndexTool();
indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
int status = indexingTool.run(cmdArgs);
assertEquals(0, status);
//assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
rs = stmt.executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("UNAME10", rs.getString(1));
assertEquals(1, rs.getInt(2));
assertTrue(rs.next());
assertEquals("UNAME2", rs.getString(1));
assertEquals(2, rs.getInt(2));
conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable));
} finally {
conn.close();
}
}
private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
String indxTable, boolean isLocal) {
String expectedExplainPlan = "";
if(isLocal) {
final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable));
expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
+ "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName);
} else {
expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
+ "\n SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable));
}
assertEquals(expectedExplainPlan,actualExplainPlan);
}
private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
return getArgValues(schemaName, dataTable, indxTable, false);
}
private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
final List<String> args = Lists.newArrayList();
if (schemaName!=null) {
args.add("-s");
args.add(schemaName);
}
args.add("-dt");
args.add(dataTable);
args.add("-it");
args.add(indxTable);
if(directApi) {
args.add("-direct");
// Need to run this job in foreground for the test to be deterministic
args.add("-runfg");
}
args.add("-op");
args.add("/tmp/"+UUID.randomUUID().toString());
return args.toArray(new String[0]);
}
private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
// insert row
stmt.setInt(1, i);
stmt.setString(2, "uname" + String.valueOf(i));
stmt.setInt(3, 95050 + i);
stmt.executeUpdate();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
try {
DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
} finally {
try {
hbaseTestUtil.shutdownMiniMapReduceCluster();
} finally {
hbaseTestUtil.shutdownMiniCluster();
}
}
}
}