blob: 76aea5b6816edd3d2fc71af180c61102d380d9a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.iterate.ScanningResultPostDummyResultCaller;
import org.apache.phoenix.iterate.ScanningResultPostValidResultCaller;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* ServerPagingIT tests that include some region moves while performing rs#next.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class ServerPagingWithRegionMovesIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerPagingWithRegionMovesIT.class);
private static boolean hasTestStarted = false;
private static int countOfDummyResults = 0;
private static int countOfValidResults = 0;
private static final List<String> TABLE_NAMES = Collections.synchronizedList(new ArrayList<>());
private static class TestScanningResultPostValidResultCaller extends
ScanningResultPostValidResultCaller {
@Override
public void postValidRowProcess() {
if (hasTestStarted && (countOfValidResults++ % 2) == 0 &&
(countOfValidResults < 17 ||
countOfValidResults > 28 && countOfValidResults < 40)) {
LOGGER.info("Moving regions of tables {}. current count of valid results: {}",
TABLE_NAMES, countOfValidResults);
TABLE_NAMES.forEach(table -> {
try {
moveRegionsOfTable(table);
} catch (Exception e) {
LOGGER.error("Unable to move regions of table: {}", table);
}
});
}
}
}
private static class TestScanningResultPostDummyResultCaller extends
ScanningResultPostDummyResultCaller {
@Override
public void postDummyProcess() {
if (hasTestStarted && (countOfDummyResults++ % 3) == 0 &&
(countOfDummyResults < 17 ||
countOfDummyResults > 28 && countOfDummyResults < 40)) {
LOGGER.info("Moving regions of tables {}. current count of dummy results: {}",
TABLE_NAMES, countOfDummyResults);
TABLE_NAMES.forEach(table -> {
try {
moveRegionsOfTable(table);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, String.valueOf(2));
props.put(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, String.valueOf(1));
props.put(QueryServices.PHOENIX_POST_DUMMY_PROCESS,
TestScanningResultPostDummyResultCaller.class.getName());
props.put(QueryServices.PHOENIX_POST_VALID_PROCESS,
TestScanningResultPostValidResultCaller.class.getName());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@After
public void tearDown() throws Exception {
TABLE_NAMES.clear();
hasTestStarted = false;
countOfDummyResults = 0;
countOfValidResults = 0;
}
private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged)
throws SQLException {
Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
Map<MetricType, Long> metricValues = entry.getValue();
Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
assertNotNull(pagedRowsCntr);
if (isPaged) {
assertTrue(String.format("Got %d", pagedRowsCntr), pagedRowsCntr > 0);
} else {
assertEquals(String.format("Got %d", pagedRowsCntr), 0, (long) pagedRowsCntr);
}
}
assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
}
@Test
public void testOrderByNonAggregation() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String tenantId = getOrganizationId();
final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00");
final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00");
final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00");
final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00");
final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00");
final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00");
final String F1 = "A";
final String F2 = "B";
final String F3 = "C";
final String R1 = "R1";
final String R2 = "R2";
byte[][] splits = new byte[][] {
ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D3)),
ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D5)),
};
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "create table " + tablename +
" (organization_id char(15) not null," +
" date date not null," +
" feature char(1) not null," +
" unique_users integer not null,\n" +
" transactions bigint,\n" +
" region varchar,\n" +
" CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", feature, unique_users))";
TABLE_NAMES.add(tablename);
StringBuilder buf = new StringBuilder(ddl);
if (splits != null) {
buf.append(" SPLIT ON (");
for (int i = 0; i < splits.length; i++) {
buf.append("'").append(Bytes.toString(splits[i])).append("'").append(",");
}
buf.setCharAt(buf.length()-1, ')');
}
ddl = buf.toString();
conn.createStatement().execute(ddl);
PreparedStatement stmt = conn.prepareStatement(
"upsert into " + tablename +
" (" +
" ORGANIZATION_ID, " +
" \"DATE\", " +
" FEATURE, " +
" UNIQUE_USERS, " +
" TRANSACTIONS, " +
" REGION) " +
"VALUES (?, ?, ?, ?, ?, ?)");
stmt.setString(1, tenantId);
stmt.setDate(2, D1);
stmt.setString(3, F1);
stmt.setInt(4, 10);
stmt.setLong(5, 100L);
stmt.setString(6, R2);
stmt.execute();
stmt.setString(1, tenantId);
stmt.setDate(2, D2);
stmt.setString(3, F1);
stmt.setInt(4, 20);
stmt.setLong(5, 200);
stmt.setString(6, null);
stmt.execute();
stmt.setString(1, tenantId);
stmt.setDate(2, D3);
stmt.setString(3, F1);
stmt.setInt(4, 30);
stmt.setLong(5, 300);
stmt.setString(6, R1);
stmt.execute();
stmt.setString(1, tenantId);
stmt.setDate(2, D4);
stmt.setString(3, F2);
stmt.setInt(4, 40);
stmt.setLong(5, 400);
stmt.setString(6, R1);
stmt.execute();
stmt.setString(1, tenantId);
stmt.setDate(2, D5);
stmt.setString(3, F3);
stmt.setInt(4, 50);
stmt.setLong(5, 500);
stmt.setString(6, R2);
stmt.execute();
stmt.setString(1, tenantId);
stmt.setDate(2, D6);
stmt.setString(3, F1);
stmt.setInt(4, 60);
stmt.setLong(5, 600);
stmt.setString(6, null);
stmt.execute();
conn.commit();
}
String query = "SELECT \"DATE\", transactions t FROM "+tablename+
" WHERE organization_id=? AND unique_users <= 30 ORDER BY t DESC LIMIT 2";
try (Connection conn = DriverManager.getConnection(getUrl(), props);
PreparedStatement statement = conn.prepareStatement(query)) {
TestUtil.dumpTable(conn, TableName.valueOf(tablename));
statement.setString(1, tenantId);
try (ResultSet rs = statement.executeQuery()) {
moveRegionsOfTable(tablename);
assertTrue(rs.next());
assertEquals(D3.getTime(), rs.getDate(1).getTime());
moveRegionsOfTable(tablename);
assertTrue(rs.next());
moveRegionsOfTable(tablename);
assertEquals(D2.getTime(), rs.getDate(1).getTime());
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
}
private static void moveRegionsOfTable(String tableName)
throws IOException {
Admin admin = getUtility().getAdmin();
List<ServerName> servers =
new ArrayList<>(admin.getRegionServers());
ServerName server1 = servers.get(0);
ServerName server2 = servers.get(1);
List<RegionInfo> regionsOnServer1;
try {
regionsOnServer1 = admin.getRegions(server1);
} catch (IOException e) {
throw new RuntimeException(e);
}
List<RegionInfo> regionsOnServer2;
try {
regionsOnServer2 = admin.getRegions(server2);
} catch (IOException e) {
throw new RuntimeException(e);
}
regionsOnServer1.forEach(regionInfo -> {
if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
try {
admin.move(regionInfo.getEncodedNameAsBytes(), server2);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
regionsOnServer2.forEach(regionInfo -> {
if (regionInfo.getTable().equals(TableName.valueOf(tableName))) {
try {
admin.move(regionInfo.getEncodedNameAsBytes(), server1);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
private static void moveAllRegions() throws IOException {
Admin admin = getUtility().getAdmin();
List<ServerName> servers =
new ArrayList<>(admin.getRegionServers());
ServerName server1 = servers.get(0);
ServerName server2 = servers.get(1);
List<RegionInfo> regionsOnServer1;
try {
regionsOnServer1 = admin.getRegions(server1);
} catch (IOException e) {
throw new RuntimeException(e);
}
List<RegionInfo> regionsOnServer2;
try {
regionsOnServer2 = admin.getRegions(server2);
} catch (IOException e) {
throw new RuntimeException(e);
}
regionsOnServer1.forEach(regionInfo -> {
try {
admin.move(regionInfo.getEncodedNameAsBytes(), server2);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
regionsOnServer2.forEach(regionInfo -> {
try {
admin.move(regionInfo.getEncodedNameAsBytes(), server1);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Test
public void testLimitOffsetWithSplit() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l",
"m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n"
+ "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) "
+ "SPLIT ON ('e','i','o')";
TABLE_NAMES.add(tablename);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
for (int i = 0; i < 26; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename + " values('"
+ STRINGS[i] + "'," + i + ","
+ (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
}
conn.commit();
int limit = 10;
// Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
int offset = 8;
ResultSet rs;
rs = conn.createStatement()
.executeQuery("SELECT t_id from " + tablename + " order by t_id limit "
+ limit + " offset " + offset);
int i = 0;
while (i < limit) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, true);
// Testing query with offset + filter
int filterCond = 10;
rs = conn.createStatement().executeQuery(
"SELECT t_id from " + tablename + " where k2 > " + filterCond +
" order by t_id limit " + limit + " offset " + offset);
i = 0;
limit = 5;
while (i < limit) {
if (i % 4 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i,
STRINGS[offset + filterCond + i], rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, true);
limit = 35;
rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename
+ " union all SELECT t_id from "
+ tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
i = 0;
while (i++ < STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[offset + i - 1], rs.getString(1));
}
i = 0;
while (i++ < limit - STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[i - 1], rs.getString(1));
}
// no paging when serial offset
assertServerPagingMetric(tablename, rs, true);
limit = 1;
offset = 1;
rs = conn.createStatement()
.executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit "
+ limit + " offset " + offset);
moveRegionsOfTable(tablename);
assertTrue(rs.next());
assertEquals(25, rs.getInt(1));
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
@Test
public void testLimitOffsetWithoutSplit() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l",
"m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n"
+ "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
TABLE_NAMES.add(tablename);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
for (int i = 0; i < 26; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename + " values('"
+ STRINGS[i] + "'," + i + ","
+ (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
}
conn.commit();
int limit = 10;
// Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
int offset = 8;
ResultSet rs;
rs = conn.createStatement()
.executeQuery("SELECT t_id from " + tablename + " order by t_id limit "
+ limit + " offset " + offset);
int i = 0;
while (i < limit) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, false);
// Testing query with offset + filter
int filterCond = 10;
rs = conn.createStatement().executeQuery(
"SELECT t_id from " + tablename + " where k2 > " + filterCond +
" order by t_id limit " + limit + " offset " + offset);
i = 0;
limit = 5;
while (i < limit) {
if (i % 4 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i,
STRINGS[offset + filterCond + i], rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, false);
limit = 35;
rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename
+ " union all SELECT t_id from "
+ tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
i = 0;
while (i++ < STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[offset + i - 1], rs.getString(1));
}
i = 0;
while (i++ < limit - STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[i - 1], rs.getString(1));
}
assertServerPagingMetric(tablename, rs, false);
limit = 1;
offset = 1;
rs = conn.createStatement()
.executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit "
+ limit + " offset " + offset);
moveRegionsOfTable(tablename);
assertTrue(rs.next());
assertEquals(25, rs.getInt(1));
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
@Test
public void testLimitOffsetWithSplit2() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String[] STRINGS = {"a_xyz", "b_xyz", "c_xyz", "d_xyz", "e_xyz", "f_xyz", "g_xyz",
"h_xyz", "i_xyz", "j_xyz", "k_xyz", "l_xyz",
"m_xyz", "n_xyz", "o_xyz", "p_xyz", "q_xyz", "r_xyz", "s_xyz", "t_xyz", "u_xyz",
"v_xyz", "w_xyz", "x_xyz", "y_xyz", "z_xyz"};
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR,\n"
+ "k1 INTEGER,\n" + "k2 INTEGER,\n" + "C3.k3 INTEGER,\n"
+ "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id)) "
+ "SPLIT ON ('e123','i123','o123')";
TABLE_NAMES.add(tablename);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
for (int i = 0; i < 26; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename + " values('"
+ STRINGS[i] + "'," + (i + 1) * 2 + ","
+ (i + 1) * 3 + "," + (i + 2) * 2 + ",'" + STRINGS[25 - i] + "')");
}
conn.commit();
int limit = 10;
// Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
int offset = 8;
ResultSet rs;
rs = conn.createStatement()
.executeQuery("SELECT t_id from " + tablename + " order by t_id limit "
+ limit + " offset " + offset);
int i = 0;
while (i < limit) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, true);
// Testing query with offset + filter
int filterCond = 30;
rs = conn.createStatement().executeQuery(
"SELECT t_id from " + tablename + " where k2 > " + filterCond +
" order by t_id limit " + limit + " offset " + offset);
i = 0;
limit = 5;
while (i < limit) {
if (i % 4 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i,
STRINGS[offset + 10 + i], rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, true);
limit = 35;
rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename
+ " union all SELECT t_id from "
+ tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
i = 0;
while (i++ < STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[offset + i - 1], rs.getString(1));
}
i = 0;
while (i++ < limit - STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[i - 1], rs.getString(1));
}
// no paging when serial offset
assertServerPagingMetric(tablename, rs, true);
limit = 1;
offset = 1;
rs = conn.createStatement()
.executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit "
+ limit + " offset " + offset);
moveRegionsOfTable(tablename);
assertTrue(rs.next());
assertEquals(75, rs.getInt(1));
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
@Test
public void testLimitOffsetWithoutSplit2() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String[] STRINGS = {"a_xyz", "b_xyz", "c_xyz", "d_xyz", "e_xyz", "f_xyz", "g_xyz",
"h_xyz", "i_xyz", "j_xyz", "k_xyz", "l_xyz",
"m_xyz", "n_xyz", "o_xyz", "p_xyz", "q_xyz", "r_xyz", "s_xyz", "t_xyz", "u_xyz",
"v_xyz", "w_xyz", "x_xyz", "y_xyz", "z_xyz"};
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR,\n"
+ "k1 INTEGER,\n" + "k2 INTEGER,\n" + "C3.k3 INTEGER,\n"
+ "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id))";
TABLE_NAMES.add(tablename);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
for (int i = 0; i < 26; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename + " values('"
+ STRINGS[i] + "'," + (i + 1) * 2 + ","
+ (i + 1) * 3 + "," + (i + 2) * 2 + ",'" + STRINGS[25 - i] + "')");
}
conn.commit();
int limit = 10;
// Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
int offset = 8;
ResultSet rs;
rs = conn.createStatement()
.executeQuery("SELECT t_id from " + tablename + " order by t_id limit "
+ limit + " offset " + offset);
int i = 0;
while (i < limit) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, false);
// Testing query with offset + filter
int filterCond = 30;
rs = conn.createStatement().executeQuery(
"SELECT t_id from " + tablename + " where k2 > " + filterCond +
" order by t_id limit " + limit + " offset " + offset);
i = 0;
limit = 5;
while (i < limit) {
if (i % 4 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals("Expected string didn't match for i = " + i,
STRINGS[offset + 10 + i], rs.getString(1));
i++;
}
assertServerPagingMetric(tablename, rs, false);
limit = 35;
rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename
+ " union all SELECT t_id from "
+ tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
i = 0;
while (i++ < STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[offset + i - 1], rs.getString(1));
}
i = 0;
while (i++ < limit - STRINGS.length - offset) {
if (i % 3 == 0) {
moveRegionsOfTable(tablename);
}
assertTrue(rs.next());
assertEquals(STRINGS[i - 1], rs.getString(1));
}
// no paging when serial offset
assertServerPagingMetric(tablename, rs, false);
limit = 1;
offset = 1;
rs = conn.createStatement()
.executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit "
+ limit + " offset " + offset);
moveRegionsOfTable(tablename);
assertTrue(rs.next());
assertEquals(75, rs.getInt(1));
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
@Test
public void testGroupBy() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
TABLE_NAMES.add(tablename);
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n"
+ "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
for (int i = 0; i < 8; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename
+ " values('tenant1'," + i + ","
+ (i + 1) + ")");
}
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 10, 2)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 11, 2)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 12, 3)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 13, 3)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 14, 4)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 15, 4)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 16, 4)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 17, 5)");
conn.createStatement()
.execute("UPSERT INTO " + tablename + " values('tenant1', 18, 5)");
conn.commit();
TestUtil.dumpTable(conn, TableName.valueOf(tablename));
ResultSet rs =
conn.createStatement().executeQuery("SELECT k2, count(*) FROM " + tablename
+ " where t_id = 'tenant1' group by k2");
moveRegionsOfTable(tablename);
Assert.assertTrue(rs.next());
Assert.assertEquals(1, rs.getInt(1));
Assert.assertEquals(1, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(2, rs.getInt(1));
Assert.assertEquals(3, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(3, rs.getInt(1));
Assert.assertEquals(3, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(4, rs.getInt(1));
Assert.assertEquals(4, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(5, rs.getInt(1));
Assert.assertEquals(3, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(6, rs.getInt(1));
Assert.assertEquals(1, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(7, rs.getInt(1));
Assert.assertEquals(1, rs.getInt(2));
Assert.assertTrue(rs.next());
Assert.assertEquals(8, rs.getInt(1));
Assert.assertEquals(1, rs.getInt(2));
Assert.assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
}
}
@Test
public void testGroupByWithIndex() throws Exception {
hasTestStarted = true;
final String tablename = generateUniqueName();
final String indexName = generateUniqueName();
TABLE_NAMES.add(tablename);
TABLE_NAMES.add(indexName);
String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n"
+ "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
String indexDDl = "CREATE INDEX IF NOT EXISTS " + indexName + " ON "
+ tablename + "(k2)";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
createTestTable(getUrl(), ddl);
createTestTable(getUrl(), indexDDl);
for (int i = 0; i < 8; i++) {
conn.createStatement().execute("UPSERT INTO " + tablename
+ " values('tenant1'," + i + ","
+ (i + 1) + ")");
}
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tablename
+ " where t_id = 'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6");
boolean moveRegions = true;
moveRegionsOfTable(tablename);
moveRegionsOfTable(indexName);
while (rs.next()) {
if (moveRegions) {
moveRegionsOfTable(tablename);
moveRegionsOfTable(indexName);
moveRegions = false;
} else {
moveRegions = true;
}
Assert.assertEquals(1, rs.getInt(1));
}
Assert.assertFalse(rs.next());
assertServerPagingMetric(indexName, rs, true);
}
}
}