blob: 4987c6e77629d343cd6b31e81848586543c6faf2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.util.*;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.sql.Connection;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
@Category(NeedsOwnMiniClusterTest.class)
public class MutableIndexExtendedIT extends ParallelStatsDisabledIT {
private static final Logger LOG = LoggerFactory.getLogger(MutableIndexExtendedIT.class);
protected final boolean localIndex;
protected final String tableDDLOptions;
public MutableIndexExtendedIT(Boolean localIndex, String txProvider, Boolean columnEncoded) {
this.localIndex = localIndex;
StringBuilder optionBuilder = new StringBuilder();
if (txProvider != null) {
optionBuilder
.append("TRANSACTIONAL=true," + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER
+ "='" + txProvider + "'");
}
if (!columnEncoded) {
if (optionBuilder.length() != 0) optionBuilder.append(",");
optionBuilder.append("COLUMN_ENCODED_BYTES=0");
}
this.tableDDLOptions = optionBuilder.toString();
}
private static Connection getConnection(Properties props) throws SQLException {
props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB,
Integer.toString(1));
Connection conn = DriverManager.getConnection(getUrl(), props);
return conn;
}
protected static Connection getConnection() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
return getConnection(props);
}
// name is used by failsafe as file name in reports
@Parameterized.Parameters(name = "MutableIndexExtendedIT_localIndex={0},transactionProvider={1},columnEncoded={2}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ false, null, false }, { false, null, true },
// OMID does not support local indexes or column encoding
{ false, "OMID", false },
{ true, null, false }, { true, null, true },
});
}
// some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a
// corresponding row in syscat. This tests that compaction isn't blocked
// TODO: Move to a different test class?
@Test(timeout = 120000)
public void testCompactNonPhoenixTable() throws Exception {
if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true")) return;
try (Connection conn = getConnection()) {
// create a vanilla HBase table (non-Phoenix)
String randomTable = generateUniqueName();
TableName hbaseTN = TableName.valueOf(randomTable);
byte[] famBytes = Bytes.toBytes("fam");
Table hTable = getUtility().createTable(hbaseTN, famBytes);
TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class);
Put put = new Put(Bytes.toBytes("row"));
byte[] value = new byte[1];
Bytes.random(value);
put.addColumn(famBytes, Bytes.toBytes("colQ"), value);
hTable.put(put);
// major compaction shouldn't cause a timeout or RS abort
List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN);
HRegion hRegion = regions.get(0);
hRegion.flush(true);
HStore store = hRegion.getStore(famBytes);
// Trigger major compaction
store.triggerMajorCompaction();
Optional<CompactionContext> requestCompaction =
store.requestCompaction(org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER,
CompactionLifeCycleTracker.DUMMY, null);
store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
assertEquals(1, store.getStorefiles().size());
// we should be able to compact syscat itself as well
regions =
getUtility().getHBaseCluster().getRegions(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
hRegion = regions.get(0);
hRegion.flush(true);
store = hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
// Trigger major compaction
store.triggerMajorCompaction();
requestCompaction =
store.requestCompaction(org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER,
CompactionLifeCycleTracker.DUMMY, null);
store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
assertEquals(1, store.getStorefiles().size());
}
}
@Test
@Ignore
public void testIndexHalfStoreFileReader() throws Exception {
if (!localIndex) return;
Connection conn1 = getConnection();
ConnectionQueryServices
connectionQueryServices =
driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES);
Admin admin = connectionQueryServices.getAdmin();
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
createBaseTable(conn1, tableName, "('e')");
conn1.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + tableName
+ "(v1)" + (localIndex ? "" : " SPLIT ON ('e')"));
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'z')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
conn1.commit();
String query = "SELECT count(*) FROM " + tableName + " where v1<='z'";
ResultSet rs = conn1.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
TableName indexTable = TableName.valueOf(localIndex ? tableName : indexName);
admin.flush(indexTable);
boolean merged = false;
// merge regions until 1 left
long numRegions = 0;
while (true) {
rs = conn1.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(4,
rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results?
try {
List<RegionInfo> indexRegions = admin.getRegions(indexTable);
numRegions = indexRegions.size();
if (numRegions == 1) {
break;
}
if (!merged) {
List<RegionInfo> regions = admin.getRegions(indexTable);
LOG.info("Merging: " + regions.size());
admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
regions.get(1).getEncodedNameAsBytes(), false);
merged = true;
Threads.sleep(10000);
}
} catch (Exception ex) {
LOG.info("error:", ex);
}
long waitStartTime = System.currentTimeMillis();
// wait until merge happened
while (System.currentTimeMillis() - waitStartTime < 10000) {
List<RegionInfo> regions = admin.getRegions(indexTable);
LOG.info("Waiting:" + regions.size());
if (regions.size() < numRegions) {
break;
}
Threads.sleep(1000);
}
SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable);
assertTrue("Index table should be online ", admin.isTableAvailable(indexTable));
}
}
protected void createBaseTable(Connection conn, String tableName, String splits)
throws SQLException {
String ddl =
"CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n"
+ "v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" + (
tableDDLOptions != null ?
tableDDLOptions :
"") + (splits != null ? (" split on " + splits) : "");
conn.createStatement().execute(ddl);
}
}