blob: e4add9afec4e9419420b4c383f5741a5e0a39f3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
public class SkipScanAfterManualSplitIT extends ParallelStatsDisabledIT {
private static final int BATCH_SIZE = 25;
private static final int MAX_FILESIZE = 1024 * 10;
private static final int PAYLOAD_SIZE = 1024;
private static final String PAYLOAD;
static {
StringBuilder buf = new StringBuilder();
for (int i = 0; i < PAYLOAD_SIZE; i++) {
buf.append('a');
}
PAYLOAD = buf.toString();
}
private static final int MIN_CHAR = 'a';
private static final int MAX_CHAR = 'z';
private static Connection getConnection() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
Connection conn = DriverManager.getConnection(getUrl(), props);
return conn;
}
private static void initTable(String tableName) throws Exception {
Connection conn = getConnection();
conn.createStatement().execute("CREATE TABLE " + tableName + "("
+ "a VARCHAR PRIMARY KEY, b VARCHAR) "
+ HTableDescriptor.MAX_FILESIZE + "=" + MAX_FILESIZE + ","
+ " SALT_BUCKETS = 4");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
int rowCount = 0;
for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
String pk = Character.toString((char)c1) + Character.toString((char)c2);
stmt.setString(1, pk);
stmt.setString(2, PAYLOAD);
stmt.execute();
rowCount++;
if (rowCount % BATCH_SIZE == 0) {
conn.commit();
}
}
}
conn.commit();
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HBaseAdmin admin = services.getAdmin();
try {
admin.flush(tableName);
} finally {
admin.close();
}
conn.close();
}
@Test
public void testManualSplit() throws Exception {
String tableName = generateUniqueName();
byte[] tableNameBytes = Bytes.toBytes(tableName);
initTable(tableName);
Connection conn = getConnection();
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
int nRegions = services.getAllTableRegions(tableNameBytes).size();
int nInitialRegions = nRegions;
HBaseAdmin admin = services.getAdmin();
try {
admin.split(tableName);
int nTries = 0;
while (nRegions == nInitialRegions && nTries < 10) {
Thread.sleep(1000);
nRegions = services.getAllTableRegions(tableNameBytes).size();
nTries++;
}
// Split finished by this time, but cache isn't updated until
// table is accessed
assertEquals(nRegions, nInitialRegions);
int nRows = 2;
String query = "SELECT count(*) FROM " + tableName + " WHERE a IN ('tl','jt',' a',' b',' c',' d')";
ResultSet rs1 = conn.createStatement().executeQuery(query);
assertTrue(rs1.next());
nRegions = services.getAllTableRegions(tableNameBytes).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nInitialRegions);
/*
if (nRows != rs1.getInt(1)) {
// Run the same query again and it always passes now
// (as region cache is up-to-date)
ResultSet r2 = conn.createStatement().executeQuery(query);
assertTrue(r2.next());
assertEquals(nRows, r2.getInt(1));
}
*/
assertEquals(nRows, rs1.getInt(1));
} finally {
admin.close();
}
}
/**
* The length of the row keys used for this test. Needed to create split points.
*/
private static final int REGION_BOUND_LENGTH_BYTES = 54;
/**
* Takes the given byteArrays and concatenates them in a buffer of length
* #REGION_BOUND_LENGTH_BYTES. if the byte arrays have a combined length of less than
* #REGION_BOUND_LENGTH_BYTES, pads with zeros. if they have a combined length of greater
* than the limit, throws a BufferOverflowException.
* @param byteArrays the byte arrays to concatenate in the row key buffer
* @return the final resulting row key.
*/
private static byte[] bytesToRowKey(byte[]... byteArrays) {
ByteBuffer buffer = ByteBuffer.allocate(REGION_BOUND_LENGTH_BYTES);
for(byte[] byteArray : byteArrays) {
buffer.put(byteArray);
}
return buffer.array();
}
/**
* Creates a region boundary at the given row key values. Follows the schema used in
* #testSkipScanIntersectStateReset().
* @param salt this region's salt byte
* @param orgId the first row key value, a string of length 15
* @param parentId the second row key value, a string of length 15
* @param invertedDate the long timestamp of a date value, with the sign bit flipped
* @param entityId the final row key value, a string of length 15
* @return the region boundary found at these row key values
*/
private static byte[] getRegionBoundary(int salt, String orgId, String parentId, long invertedDate, String entityId) {
return bytesToRowKey(new byte[] {(byte)salt}, Bytes.toBytes(orgId), Bytes.toBytes(parentId), Bytes.toBytes(invertedDate), Bytes.toBytes(entityId));
}
/**
* Creates a region boundary at the given salt byte. This is the boundary that would be used
* when pre-splitting the regions for a salted table.
* @param salt this region's salt byte
* @return the region boundary for this salt byte
*/
private static byte[] getSaltBoundary(int salt) {
return bytesToRowKey(new byte[] {(byte)salt});
}
/**
* The region boundaries used to split the table at the start of the test.
* These region boundaries were extracted from a reproducing case used during bug fixing.
* Only specific combinations of boundaries will interact with each other in the way needed to
* cause regions to be missed.
*/
private static final byte[][] REGION_BOUNDS = {
getRegionBoundary(0, "00Dxx0000001gER", "001xx000003DGz2", 9223370631742791807L, "017xx0000022OGX"),
getRegionBoundary(0, "00Dxx0000001gER", "001xx000003DHlF", 9223370631742760807L, "017xx0000022WMz"),
getRegionBoundary(0, "00Dxx0000001gER", "001xx000003DINU", 9223370631742737807L, "017xx0000022dPO"),
getSaltBoundary(1),
getRegionBoundary(1, "00Dxx0000001gER", "001xx000003DGu0", 9223370631742793807L, "017xx0000022Nes"),
getRegionBoundary(1, "00Dxx0000001gER", "001xx000003DHfN", 9223370631742900807L, "017xx0000022GtM"),
getRegionBoundary(1, "00Dxx0000001gER", "001xx000003DIMd", 9223370631742737807L, "017xx0000022cw6"),
getSaltBoundary(2),
getRegionBoundary(2, "00Dxx0000001gER", "001xx000003DGyV", 9223370631742791807L, "017xx0000022OJn"),
getRegionBoundary(2, "00Dxx0000001gER", "001xx000003DHk4", 9223370631742760807L, "017xx0000022Wb0"),
getRegionBoundary(2, "00Dxx0000001gER", "001xx000003DIRW", 9223370631742736807L, "017xx0000022dVq"),
getSaltBoundary(3),
getRegionBoundary(3, "00Dxx0000001gER", "001xx000003DGul", 9223370631742793807L, "017xx0000022NMC"),
getRegionBoundary(3, "00Dxx0000001gER", "001xx000003DHgC", 9223370631742762807L, "017xx0000022WAK"),
getRegionBoundary(3, "00Dxx0000001gER", "001xx000003DIMV", 9223370631742737807L, "017xx0000022d2P"),
getSaltBoundary(4),
getRegionBoundary(4, "00Dxx0000001gER", "001xx000003DGye", 9223370631742791807L, "017xx0000022NyS"),
getRegionBoundary(4, "00Dxx0000001gER", "001xx000003DHiz", 9223370631742762807L, "017xx0000022Vz3"),
getRegionBoundary(4, "00Dxx0000001gER", "001xx000003DILw", 9223370631742887807L, "017xx0000022HZv"),
getSaltBoundary(5),
getRegionBoundary(5, "00Dxx0000001gER", "001xx000003DGy7", 9223370631742791807L, "017xx0000022O8t"),
getRegionBoundary(5, "00Dxx0000001gER", "001xx000003DHip", 9223370631742762807L, "017xx0000022W5R"),
getRegionBoundary(5, "00Dxx0000001gER", "001xx000003DIMP", 9223370631742737807L, "017xx0000022d8h"),
getSaltBoundary(6),
getRegionBoundary(6, "00Dxx0000001gER", "001xx000003DGzO", 9223370631742791807L, "017xx0000022Nti"),
getRegionBoundary(6, "00Dxx0000001gER", "001xx000003DHmV", 9223370631742759807L, "017xx0000022XH9"),
getRegionBoundary(6, "00Dxx0000001gER", "001xx000003DISr", 9223370631742733807L, "017xx0000022e5A"),
getSaltBoundary(7),
getRegionBoundary(7, "00Dxx0000001gER", "001xx000003DGtW", 9223370631742916807L, "017xx0000022G7V"),
getRegionBoundary(7, "00Dxx0000001gER", "001xx000003DHhw", 9223370631742762807L, "017xx0000022W2c"),
getRegionBoundary(7, "00Dxx0000001gER", "001xx000003DIKn", 9223370631742740807L, "017xx0000022ceD"),
getSaltBoundary(8),
getRegionBoundary(8, "00Dxx0000001gER", "001xx000003DH0j", 9223370631742790807L, "017xx0000022OvN"),
getRegionBoundary(8, "00Dxx0000001gER", "001xx000003DHmR", 9223370631742759807L, "017xx0000022WyU"),
getRegionBoundary(8, "00Dxx0000001gER", "001xx000003DIMl", 9223370631742737807L, "017xx0000022czJ"),
getSaltBoundary(9),
getRegionBoundary(9, "00Dxx0000001gER", "001xx000003DGtF", 9223370631742916807L, "017xx0000022G7E"),
getRegionBoundary(9, "00Dxx0000001gER", "001xx000003DHhi", 9223370631742762807L, "017xx0000022Vk1"),
getRegionBoundary(9, "00Dxx0000001gER", "001xx000003DIKP", 9223370631742740807L, "017xx0000022cpA"),
getSaltBoundary(10),
getRegionBoundary(10, "00Dxx0000001gER", "001xx000003DGzU", 9223370631742791807L, "017xx0000022Nsb"),
getRegionBoundary(10, "00Dxx0000001gER", "001xx000003DHmO", 9223370631742760807L, "017xx0000022WFU"),
getRegionBoundary(10, "00Dxx0000001gER", "001xx000003DISr", 9223370631742733807L, "017xx0000022e55"),
getSaltBoundary(11),
getRegionBoundary(11, "00Dxx0000001gER", "001xx000003DGzB", 9223370631742791807L, "017xx0000022OLb"),
getRegionBoundary(11, "00Dxx0000001gER", "001xx000003DHki", 9223370631742760807L, "017xx0000022WOU"),
getRegionBoundary(11, "00Dxx0000001gER", "001xx000003DIOF", 9223370631742737807L, "017xx0000022dIS"),
getSaltBoundary(12),
getRegionBoundary(12, "00Dxx0000001gER", "001xx000003DH0X", 9223370631742790807L, "017xx0000022OoI"),
getRegionBoundary(12, "00Dxx0000001gER", "001xx000003DHkT", 9223370631742760807L, "017xx0000022WSs"),
getRegionBoundary(12, "00Dxx0000001gER", "001xx000003DILp", 9223370631742740807L, "017xx0000022cOL"),
getSaltBoundary(13),
getRegionBoundary(13, "00Dxx0000001gER", "001xx000003DGvw", 9223370631742793807L, "017xx0000022Ncy"),
getRegionBoundary(13, "00Dxx0000001gER", "001xx000003DHi8", 9223370631742762807L, "017xx0000022VjH"),
getRegionBoundary(13, "00Dxx0000001gER", "001xx000003DINt", 9223370631742737807L, "017xx0000022dLm"),
getSaltBoundary(14),
getRegionBoundary(14, "00Dxx0000001gER", "001xx000003DGzJ", 9223370631742791807L, "017xx0000022Nwo"),
getRegionBoundary(14, "00Dxx0000001gER", "001xx000003DHls", 9223370631742760807L, "017xx0000022WH4"),
getRegionBoundary(14, "00Dxx0000001gER", "001xx000003DIRy", 9223370631742736807L, "017xx0000022doL"),
getSaltBoundary(15),
getRegionBoundary(15, "00Dxx0000001gER", "001xx000003DGsy", 9223370631742794807L, "017xx0000022MsO"),
getRegionBoundary(15, "00Dxx0000001gER", "001xx000003DHfG", 9223370631742764807L, "017xx0000022Vdz"),
getRegionBoundary(15, "00Dxx0000001gER", "001xx000003DIM9", 9223370631742737807L, "017xx0000022dAT")
};
/**
* Tests that the SkipScan behaves properly with an InList of RowValueConstructors after
* many table splits. It verifies that the SkipScan's internal state is reset properly when
* intersecting with region boundaries. Long row keys exposed an issue where calls to intersect
* would start mid-way through the range of values and produce incorrect SkipScanFilters when
* creating region specific filters in {@link org.apache.phoenix.util.ScanUtil#intersectScanRange}
* See PHOENIX-1133 and PHOENIX-1136 on apache JIRA for more details.
* @throws java.sql.SQLException from Connection
*/
@Test
public void testSkipScanInListOfRVCAfterManualSplit() throws SQLException {
Connection conn = getConnection();
String tableName = generateUniqueName();
String ddl = "CREATE TABLE " + tableName + " ( "
+ "organization_id CHAR(15) NOT NULL, "
+ "parent_id CHAR(15) NOT NULL, "
+ "created_date DATE NOT NULL, "
+ "entity_history_id CHAR(15) NOT NULL, "
+ "created_by_id VARCHAR "
+ "CONSTRAINT pk PRIMARY KEY (organization_id, parent_id, created_date DESC, entity_history_id)) "
+ "SALT_BUCKETS = 16 "
+ "SPLIT ON ("
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?"
+ ")"; // 63 split points, 64 total regions
PreparedStatement ddlStmt = conn.prepareStatement(ddl);
for(int i = 0; i < REGION_BOUNDS.length; i++) {
ddlStmt.setBytes(i + 1, REGION_BOUNDS[i]);
}
ddlStmt.execute();
conn.commit();
final String upsertPrefix = "UPSERT INTO " + tableName + " VALUES ( '00Dxx0000001gER', ";
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGr4', TO_DATE('2014-07-11 20:53:01'), '017xx0000022MmH', '005xx000001Sv21' )");
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGr5', TO_DATE('2014-07-11 20:53:01'), '017xx0000022Mln', '005xx000001Sv21' )");
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGsy', TO_DATE('2014-07-11 20:53:01'), '017xx0000022MsO', '005xx000001Sv21' )");
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGsy', TO_DATE('2014-07-11 20:53:01'), '017xx0000022MsS', '005xx000001Sv21' )");
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGtE', TO_DATE('2014-07-11 20:53:01'), '017xx0000022Mnx', '005xx000001Sv21' )");
conn.createStatement().executeUpdate(upsertPrefix + "'001xx000003DGtn', TO_DATE('2014-07-11 20:53:02'), '017xx0000022Nmv', '005xx000001Sv21' )");
conn.commit();
String sql = "SELECT "
+ "CREATED_BY_ID, PARENT_ID "
+ "FROM " + tableName
+ " WHERE ORGANIZATION_ID='00Dxx0000001gER' "
+ "AND (PARENT_ID,CREATED_DATE,ENTITY_HISTORY_ID) IN ("
+ "('001xx000003DGr4',TO_DATE('2014-07-11 20:53:01'),'017xx0000022MmH'),"
+ "('001xx000003DGr5',TO_DATE('2014-07-11 20:53:01'),'017xx0000022Mln'),"
+ "('001xx000003DGsy',TO_DATE('2014-07-11 20:53:01'),'017xx0000022MsO'),"
+ "('001xx000003DGsy',TO_DATE('2014-07-11 20:53:01'),'017xx0000022MsS'),"
+ "('001xx000003DGtE',TO_DATE('2014-07-11 20:53:01'),'017xx0000022Mnx'),"
+ "('001xx000003DGtn',TO_DATE('2014-07-11 20:53:02'),'017xx0000022Nmv')"
+ ") ORDER BY PARENT_ID";
ResultSet rs = conn.createStatement().executeQuery(sql);
final String expectedCreatedById = "005xx000001Sv21";
final String[] expectedParentIds = {
"001xx000003DGr4",
"001xx000003DGr5",
"001xx000003DGsy",
"001xx000003DGsy",
"001xx000003DGtE",
"001xx000003DGtn"
};
for(String expectedParentId : expectedParentIds) {
assertTrue(rs.next());
assertEquals(expectedCreatedById, rs.getString(1));
assertEquals(expectedParentId, rs.getString(2));
}
assertFalse(rs.next());
}
@Test
public void testMinMaxRangeIntersection() throws Exception {
Connection conn = getConnection();
String tableName = generateUniqueName();
PreparedStatement stmt = conn.prepareStatement("create table " + tableName
+ "(pk1 UNSIGNED_TINYINT NOT NULL, pk2 UNSIGNED_TINYINT NOT NULL, kv VARCHAR "
+ "CONSTRAINT pk PRIMARY KEY (pk1, pk2)) SALT_BUCKETS=4 SPLIT ON (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
// Split each salt bucket into multiple regions
stmt.setBytes(1, new byte[] {0, 1, 1});
stmt.setBytes(2, new byte[] {0, 2, 1});
stmt.setBytes(3, new byte[] {0, 3, 1});
stmt.setBytes(4, new byte[] {1, 1, 1});
stmt.setBytes(5, new byte[] {1, 2, 1});
stmt.setBytes(6, new byte[] {1, 3, 1});
stmt.setBytes(7, new byte[] {2, 1, 1});
stmt.setBytes(8, new byte[] {2, 2, 1});
stmt.setBytes(9, new byte[] {2, 3, 1});
stmt.setBytes(10, new byte[] {3, 1, 1});
stmt.setBytes(11, new byte[] {3, 2, 1});
stmt.setBytes(12, new byte[] {3, 3, 1});
stmt.execute();
// Use a query with a RVC in a non equality expression
ResultSet rs = conn.createStatement().executeQuery("select count(kv) from " + tableName + " where pk1 <= 3 and (pk1,PK2) >= (3, 1)");
assertTrue(rs.next());
}
}