blob: 30275544fec753e8b2b1c52ff68616e7aebfeac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Tests for the {@link IndexToolForPartialBuildIT}
*/
public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
protected boolean isNamespaceEnabled = false;
protected final String tableDDLOptions;
public IndexToolForPartialBuildIT() {
StringBuilder optionBuilder = new StringBuilder();
optionBuilder.append(" SPLIT ON(1,2)");
this.tableDDLOptions = optionBuilder.toString();
}
public static Map<String, String> getServerProperties() {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, Boolean.TRUE.toString());
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(2000));
return serverProps;
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> serverProps = getServerProperties();
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
}
@Test
public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
final String indxTable = String.format("%s_IDX", dataTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
final Connection conn = DriverManager.getConnection(getUrl(), props);
Statement stmt = conn.createStatement();
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
if (isNamespaceEnabled) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
stmt.execute(
String.format("CREATE IMMUTABLE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// insert two rows
upsertRow(stmt1, 1000);
upsertRow(stmt1, 2000);
conn.commit();
stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
upsertRow(stmt1, 3000);
upsertRow(stmt1, 4000);
upsertRow(stmt1, 5000);
conn.commit();
// delete these indexes
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
try (Table hTable = admin.getConnection().
getTable(TableName.valueOf(pindexTable.getPhysicalName().toString()));) {
Scan scan = new Scan();
ResultScanner scanner = hTable.getScanner(scan);
int cnt = 0;
for (Result res = scanner.next(); res != null; res = scanner.next()) {
cnt++;
if (cnt > 2) {
hTable.delete(new Delete(res.getRow()));
}
}
}
TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString());
conn.createStatement()
.execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));
ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indxTable, rs.getString(3));
String indexState = rs.getString("INDEX_STATE");
assertEquals(PIndexState.BUILDING.toString(), indexState);
assertFalse(rs.next());
upsertRow(stmt1, 6000);
upsertRow(stmt1, 7000);
conn.commit();
String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
// assert we are pulling from data table.
assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);
rs = stmt1.executeQuery(selectSql);
for (int i = 1; i <= 7; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
}
// Validate Index table data till disabled timestamp
rs = stmt1.executeQuery(String.format("SELECT * FROM %s", SchemaUtil.getTableName(schemaName, indxTable)));
for (int i = 1; i <= 2; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
}
for (int i = 6; i <= 7; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
}
assertFalse(rs.next());
// run the index MR job.
final IndexTool indexingTool = new IndexTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
indexingTool.setConf(conf);
final String[] cmdArgs = getArgValues(schemaName, dataTableName, indxTable);
int status = indexingTool.run(cmdArgs);
assertEquals(0, status);
// insert two more rows
upsertRow(stmt1, 8000);
upsertRow(stmt1, 9000);
conn.commit();
// assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, isNamespaceEnabled);
rs = stmt.executeQuery(selectSql);
for (int i = 1; i <= 9; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
}
assertFalse(rs.next());
} finally {
conn.close();
}
}
public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
String indxTable, boolean isNamespaceMapped) {
String expectedExplainPlan = "";
if (indxTable != null) {
expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
SchemaUtil.getPhysicalHBaseTableName(schemaName, indxTable, isNamespaceMapped));
} else {
expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
SchemaUtil.getPhysicalHBaseTableName(schemaName, dataTable, isNamespaceMapped));
}
assertTrue(actualExplainPlan.contains(expectedExplainPlan));
}
public String[] getArgValues(String schemaName, String dataTable, String indexName) {
final List<String> args = Lists.newArrayList();
if (schemaName!=null) {
args.add("-s");
args.add(schemaName);
}
args.add("-dt");
args.add(dataTable);
// complete index rebuild
args.add("-it");
args.add(indexName);
args.add("-runfg");
args.add("-op");
args.add("/tmp/output/partialTable_");
return args.toArray(new String[0]);
}
public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
// insert row
stmt.setInt(1, i);
stmt.setString(2, "uname" + String.valueOf(i));
stmt.setInt(3, 95050 + i);
stmt.executeUpdate();
}
}