blob: e5b0266d92298e4cdbb7c85527175eeeee889ff3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class QueryMoreIT extends ParallelStatsDisabledIT {
private final String TENANT_SPECIFIC_URL1 = getUrl() + ';' + TENANT_ID_ATTRIB + "=tenant1";
private String dataTableName;
//queryAgainstTenantSpecificView = true, dataTableSalted = true
@Test
public void testQueryMore1() throws Exception {
testQueryMore(true, true);
}
//queryAgainstTenantSpecificView = false, dataTableSalted = true
@Test
public void testQueryMore2() throws Exception {
testQueryMore(false, true);
}
//queryAgainstTenantSpecificView = false, dataTableSalted = false
@Test
public void testQueryMore3() throws Exception {
testQueryMore(false, false);
}
//queryAgainstTenantSpecificView = true, dataTableSalted = false
@Test
public void testQueryMore4() throws Exception {
testQueryMore(true, false);
}
private void testQueryMore(boolean queryAgainstTenantSpecificView, boolean dataTableSalted) throws Exception {
String[] tenantIds = new String[] {"T1_" + generateUniqueName(), "T2_" + generateUniqueName(), "T3_" + generateUniqueName()};
int numRowsPerTenant = 10;
String cursorTableName = generateUniqueName();
String base_history_table = generateUniqueName();
this.dataTableName = base_history_table + (dataTableSalted ? "_SALTED" : "");
String cursorTableDDL = "CREATE TABLE IF NOT EXISTS " +
cursorTableName + " (\n" +
"TENANT_ID VARCHAR NOT NULL\n," +
"QUERY_ID VARCHAR(15) NOT NULL,\n" +
"CURSOR_ORDER BIGINT NOT NULL \n" +
"CONSTRAINT CURSOR_TABLE_PK PRIMARY KEY (TENANT_ID, QUERY_ID, CURSOR_ORDER)) "+
"SALT_BUCKETS = 4, TTL=86400";
String baseDataTableDDL = "CREATE TABLE IF NOT EXISTS " +
dataTableName + " (\n" +
"TENANT_ID VARCHAR NOT NULL,\n" +
"PARENT_ID CHAR(15) NOT NULL,\n" +
"CREATED_DATE DATE NOT NULL,\n" +
"ENTITY_HISTORY_ID CHAR(15) NOT NULL,\n" +
"DATA_TYPE VARCHAR,\n" +
"OLDVAL_STRING VARCHAR,\n" +
"NEWVAL_STRING VARCHAR\n" +
"CONSTRAINT PK PRIMARY KEY(TENANT_ID, PARENT_ID, CREATED_DATE DESC, ENTITY_HISTORY_ID)) " +
"VERSIONS = 1, MULTI_TENANT = true" + (dataTableSalted ? ", SALT_BUCKETS = 4" : "");
//create cursor and data tables.
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(cursorTableDDL);
conn.createStatement().execute(baseDataTableDDL);
conn.close();
//upsert rows in the data table for all the tenantIds
Map<String, List<String>> historyIdsPerTenant = createHistoryTableRows(dataTableName, tenantIds, numRowsPerTenant);
// assert query more for tenantId -> tenantIds[0]
String tenantId = tenantIds[0];
String cursorQueryId = "00TcursrqueryId";
String tableOrViewName = queryAgainstTenantSpecificView ? ("HISTORY_TABLE_" + tenantId) : dataTableName;
assertEquals(numRowsPerTenant, upsertSelectRecordsInCursorTableForTenant(tableOrViewName, queryAgainstTenantSpecificView, tenantId, cursorQueryId,
cursorTableName));
/*// assert that the data inserted in cursor table matches the data in the data table for tenantId.
String selectDataTable = "SELECT TENANT_ID, PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID FROM BASE_HISTORY_TABLE WHERE TENANT_ID = ? ";
String selectCursorTable = "SELECT TENANT_ID, PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID FROM CURSOR_TABLE (PARENT_ID CHAR(15), CREATED_DATE DATE, ENTITY_HISTORY_ID CHAR(15)) WHERE TENANT_ID = ? ";
PreparedStatement stmtData = DriverManager.getConnection(getUrl()).prepareStatement(selectDataTable);
stmtData.setString(1, tenantId);
ResultSet rsData = stmtData.executeQuery();
PreparedStatement stmtCursor = DriverManager.getConnection(getUrl()).prepareStatement(selectCursorTable);
stmtCursor.setString(1, tenantId);
ResultSet rsCursor = stmtCursor.executeQuery();
while(rsData.next() && rsCursor.next()) {
assertEquals(rsData.getString("TENANT_ID"), rsCursor.getString("TENANT_ID"));
assertEquals(rsData.getString("PARENT_ID"), rsCursor.getString("PARENT_ID"));
assertEquals(rsData.getDate("CREATED_DATE"), rsCursor.getDate("CREATED_DATE"));
assertEquals(rsData.getString("ENTITY_HISTORY_ID"), rsCursor.getString("ENTITY_HISTORY_ID"));
}
*/
Connection conn2 = DriverManager.getConnection(getUrl());
ResultSet rs = conn2.createStatement().executeQuery("SELECT count(*) from " + cursorTableName);
rs.next();
assertEquals(numRowsPerTenant, rs.getInt(1));
conn2.close();
int startOrder = 0;
int endOrder = 5;
int numRecordsThatShouldBeRetrieved = numRowsPerTenant/2; // we will test for two rounds of query more.
// get first batch of cursor ids out of the cursor table.
String[] cursorIds = getRecordsOutofCursorTable(tableOrViewName, queryAgainstTenantSpecificView, tenantId, cursorQueryId, startOrder, endOrder,
cursorTableName);
assertEquals(numRecordsThatShouldBeRetrieved, cursorIds.length);
// now query and fetch first batch of records.
List<String> historyIds = doQueryMore(queryAgainstTenantSpecificView, tenantId, tableOrViewName, cursorIds);
// assert that history ids match for this tenant
assertEquals(historyIdsPerTenant.get(tenantId).subList(startOrder, endOrder), historyIds);
// get the next batch of cursor ids out of the cursor table.
cursorIds = getRecordsOutofCursorTable(tableOrViewName, queryAgainstTenantSpecificView, tenantId, cursorQueryId, startOrder + numRecordsThatShouldBeRetrieved, endOrder + numRecordsThatShouldBeRetrieved,
cursorTableName);
assertEquals(numRecordsThatShouldBeRetrieved, cursorIds.length);
// now query and fetch the next batch of records.
historyIds = doQueryMore(queryAgainstTenantSpecificView, tenantId, tableOrViewName, cursorIds);
// assert that the history ids match for this tenant
assertEquals(historyIdsPerTenant.get(tenantId).subList(startOrder + numRecordsThatShouldBeRetrieved, endOrder+ numRecordsThatShouldBeRetrieved), historyIds);
// get the next batch of cursor ids out of the cursor table.
cursorIds = getRecordsOutofCursorTable(tableOrViewName, queryAgainstTenantSpecificView, tenantId, cursorQueryId, startOrder + 2 * numRecordsThatShouldBeRetrieved, endOrder + 2 * numRecordsThatShouldBeRetrieved,
cursorTableName);
// assert that there are no more cursorids left for this tenant.
assertEquals(0, cursorIds.length);
}
private Map<String, List<String>> createHistoryTableRows(String dataTableName, String[] tenantIds, int numRowsPerTenant) throws Exception {
String upsertDML = "UPSERT INTO " + dataTableName + " VALUES (?, ?, ?, ?, ?, ?, ?)";
Connection conn = DriverManager.getConnection(getUrl());
Map<String, List<String>> historyIdsForTenant = new HashMap<String, List<String>>();
try {
PreparedStatement stmt = conn.prepareStatement(upsertDML);
for (int j = 0; j < tenantIds.length; j++) {
List<String> historyIds = new ArrayList<String>();
for (int i = 0; i < numRowsPerTenant; i++) {
stmt.setString(1, tenantIds[j]);
String parentId = "parentId" + i;
stmt.setString(2, parentId);
stmt.setDate(3, new Date(100));
String historyId = "historyId" + i;
stmt.setString(4, historyId);
stmt.setString(5, "datatype");
stmt.setString(6, "oldval");
stmt.setString(7, "newval");
stmt.executeUpdate();
historyIds.add(historyId);
}
historyIdsForTenant.put(tenantIds[j], historyIds);
}
conn.commit();
return historyIdsForTenant;
} finally {
conn.close();
}
}
private int upsertSelectRecordsInCursorTableForTenant(String tableOrViewName, boolean queryAgainstTenantView, String tenantId, String cursorQueryId,
final String cursorTable) throws Exception {
String sequenceName = "\"" + tenantId + "_SEQ\"";
Connection conn = queryAgainstTenantView ? getTenantSpecificConnection(tenantId) : DriverManager.getConnection(getUrl());
// Create a sequence. This sequence is used to fill cursor_order column for each row inserted in the cursor table.
conn.createStatement().execute("CREATE SEQUENCE " + sequenceName + " CACHE " + Long.MAX_VALUE);
conn.setAutoCommit(true);
if (queryAgainstTenantView) {
createTenantSpecificViewIfNecessary(tableOrViewName, conn);
}
try {
String tenantIdFilter = queryAgainstTenantView ? "" : " WHERE TENANT_ID = ? ";
// Using dynamic columns, we can use the same cursor table for storing primary keys for all the tables.
String upsertSelectDML = "UPSERT INTO " + cursorTable + " " +
"(TENANT_ID, QUERY_ID, CURSOR_ORDER, PARENT_ID CHAR(15), CREATED_DATE DATE, ENTITY_HISTORY_ID CHAR(15)) " +
"SELECT ?, ?, NEXT VALUE FOR " + sequenceName + ", PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID " +
" FROM " + tableOrViewName + tenantIdFilter;
PreparedStatement stmt = conn.prepareStatement(upsertSelectDML);
stmt.setString(1, tenantId);
stmt.setString(2, cursorQueryId);
if (!queryAgainstTenantView) {
stmt.setString(3, tenantId);
}
int numRecords = stmt.executeUpdate();
return numRecords;
} finally {
try {
conn.createStatement().execute("DROP SEQUENCE " + sequenceName);
} finally {
conn.close();
}
}
}
private Connection getTenantSpecificConnection(String tenantId) throws Exception {
Properties props = new Properties();
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
return DriverManager.getConnection(getUrl(), props);
}
private String createTenantSpecificViewIfNecessary(String tenantViewName, Connection tenantConn) throws Exception {
tenantConn.createStatement().execute("CREATE VIEW IF NOT EXISTS " + tenantViewName + " AS SELECT * FROM " + dataTableName);
return tenantViewName;
}
private String[] getRecordsOutofCursorTable(String tableOrViewName, boolean queryAgainstTenantSpecificView, String tenantId, String cursorQueryId,
int startOrder, int endOrder, final String cursorTable) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
List<String> pkIds = new ArrayList<String>();
String cols = queryAgainstTenantSpecificView ? "PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID" : "TENANT_ID, PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID";
String dynCols = queryAgainstTenantSpecificView ? "(PARENT_ID CHAR(15), CREATED_DATE DATE, ENTITY_HISTORY_ID CHAR(15))" : "(TENANT_ID CHAR(15), PARENT_ID CHAR(15), CREATED_DATE DATE, ENTITY_HISTORY_ID CHAR(15))";
String selectCursorSql = "SELECT " + cols + " " +
"FROM " + cursorTable + " \n" +
dynCols + " \n" +
"WHERE TENANT_ID = ? AND \n" +
"QUERY_ID = ? AND \n" +
"CURSOR_ORDER > ? AND \n" +
"CURSOR_ORDER <= ?";
PreparedStatement stmt = conn.prepareStatement(selectCursorSql);
stmt.setString(1, tenantId);
stmt.setString(2, cursorQueryId);
stmt.setInt(3, startOrder);
stmt.setInt(4, endOrder);
ResultSet rs = stmt.executeQuery();
@SuppressWarnings("unchecked")
List<Pair<String, String>> columns = queryAgainstTenantSpecificView ? Lists.newArrayList(new Pair<String, String>(null, "PARENT_ID"), new Pair<String, String>(null, "CREATED_DATE"), new Pair<String, String>(null, "ENTITY_HISTORY_ID")) : Lists.newArrayList(new Pair<String, String>(null, "TENANT_ID"), new Pair<String, String>(null, "PARENT_ID"), new Pair<String, String>(null, "CREATED_DATE"), new Pair<String, String>(null, "ENTITY_HISTORY_ID"));
while(rs.next()) {
Object[] values = new Object[columns.size()];
for (int i = 0; i < columns.size(); i++) {
values[i] = rs.getObject(i + 1);
}
conn = getTenantSpecificConnection(tenantId);
pkIds.add(Bytes.toString(Base64.getEncoder().encode(PhoenixRuntime.encodeColumnValues(conn, tableOrViewName.toUpperCase(), values, columns))));
}
return pkIds.toArray(new String[pkIds.size()]);
}
private List<String> doQueryMore(boolean queryAgainstTenantView, String tenantId, String tenantViewName, String[] cursorIds) throws Exception {
Connection conn = queryAgainstTenantView ? getTenantSpecificConnection(tenantId) : DriverManager.getConnection(getUrl());
String tableName = queryAgainstTenantView ? tenantViewName : dataTableName;
@SuppressWarnings("unchecked")
List<Pair<String, String>> columns = queryAgainstTenantView ? Lists.newArrayList(new Pair<String, String>(null, "PARENT_ID"), new Pair<String, String>(null, "CREATED_DATE"), new Pair<String, String>(null, "ENTITY_HISTORY_ID")) : Lists.newArrayList(new Pair<String, String>(null, "TENANT_ID"), new Pair<String, String>(null, "PARENT_ID"), new Pair<String, String>(null, "CREATED_DATE"), new Pair<String, String>(null, "ENTITY_HISTORY_ID"));
StringBuilder sb = new StringBuilder();
String where = queryAgainstTenantView ? " WHERE (PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID) IN " : " WHERE (TENANT_ID, PARENT_ID, CREATED_DATE, ENTITY_HISTORY_ID) IN ";
sb.append("SELECT ENTITY_HISTORY_ID FROM " + tableName + where);
int numPkCols = columns.size();
String query = addRvcInBinds(sb, cursorIds.length, numPkCols);
PreparedStatement stmt = conn.prepareStatement(query);
int bindCounter = 1;
for (int i = 0; i < cursorIds.length; i++) {
Object[] pkParts = PhoenixRuntime.decodeColumnValues(conn, tableName.toUpperCase(), Base64.getDecoder().decode(cursorIds[i]), columns);
for (int j = 0; j < pkParts.length; j++) {
stmt.setObject(bindCounter++, pkParts[j]);
}
}
ResultSet rs = stmt.executeQuery();
List<String> historyIds = new ArrayList<String>();
while(rs.next()) {
historyIds.add(rs.getString(1));
}
return historyIds;
}
private String addRvcInBinds(StringBuilder sb, int numRvcs, int numPkCols) {
sb.append("(");
for (int i = 0 ; i < numRvcs; i++) {
for (int j = 0; j < numPkCols; j++) {
if (j == 0) {
sb.append("(");
}
sb.append("?");
if (j < numPkCols - 1) {
sb.append(",");
} else {
sb.append(")");
}
}
if (i < numRvcs - 1) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}
@Test // see - https://issues.apache.org/jira/browse/PHOENIX-1696
public void testSelectColumnMoreThanOnce() throws Exception {
Date date = new Date(System.currentTimeMillis());
initEntityHistoryTableValues("abcd", getDefaultSplits("abcd"), date, null);
String query = "SELECT NEW_VALUE, NEW_VALUE FROM " + TestUtil.ENTITY_HISTORY_TABLE_NAME + " LIMIT 1";
ResultSet rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(query);
assertTrue(rs.next());
rs.getObject("NEW_VALUE");
assertFalse(rs.next());
}
@SuppressWarnings("deprecation")
@Test
public void testNullBigDecimalWithScale() throws Exception {
final String table = generateUniqueName();
final Connection conn = DriverManager.getConnection(getUrl());
conn.setAutoCommit(true);
try (Statement stmt = conn.createStatement()) {
assertFalse(stmt.execute("CREATE TABLE IF NOT EXISTS " + table + " (\n" +
"PK VARCHAR(15) NOT NULL\n," +
"\"DEC\" DECIMAL,\n" +
"CONSTRAINT TABLE_PK PRIMARY KEY (PK))"));
}
try (PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table + " (PK, \"DEC\") VALUES(?, ?)")) {
stmt.setString(1, "key");
stmt.setBigDecimal(2, null);
assertFalse(stmt.execute());
assertEquals(1, stmt.getUpdateCount());
}
try (Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM " + table);
assertNotNull(rs);
assertTrue(rs.next());
assertEquals("key", rs.getString(1));
assertNull(rs.getBigDecimal(2));
assertNull(rs.getBigDecimal(2, 10));
}
}
@Test
public void testRVCOnDescWithLeadingPKEquality() throws Exception {
final Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = generateUniqueName();
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
" ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
" SCORE DOUBLE NOT NULL,\n" +
" ENTITY_ID CHAR(15) NOT NULL\n" +
" CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
" ORGANIZATION_ID,\n" +
" SCORE DESC,\n" +
" ENTITY_ID DESC\n" +
" )\n" +
") MULTI_TENANT=TRUE");
}
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',3,'01')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',2,'04')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',2,'03')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',1,'02')");
conn.commit();
try (Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT entity_id, score\n" +
"FROM " + fullTableName + "\n" +
"WHERE organization_id = 'org1'\n" +
"AND (score, entity_id) < (2, '04')\n" +
"ORDER BY score DESC, entity_id DESC\n" +
"LIMIT 3");
assertTrue(rs.next());
assertEquals("03", rs.getString(1));
assertEquals(2.0, rs.getDouble(2), 0.001);
assertTrue(rs.next());
assertEquals("02", rs.getString(1));
assertEquals(1.0, rs.getDouble(2), 0.001);
assertFalse(rs.next());
}
try (Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT entity_id, score\n" +
"FROM " + fullTableName + "\n" +
"WHERE organization_id = 'org1'\n" +
"AND (organization_id, score, entity_id) < ('org1', 2, '04')\n" +
"ORDER BY score DESC, entity_id DESC\n" +
"LIMIT 3");
assertTrue(rs.next());
assertEquals("03", rs.getString(1));
assertEquals(2.0, rs.getDouble(2), 0.001);
assertTrue(rs.next());
assertEquals("02", rs.getString(1));
assertEquals(1.0, rs.getDouble(2), 0.001);
assertFalse(rs.next());
}
}
@Test
public void testSingleDescPKColumnComparison() throws Exception {
final Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = generateUniqueName();
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
" ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
" SCORE DOUBLE NOT NULL,\n" +
" ENTITY_ID CHAR(15) NOT NULL\n" +
" CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
" ORGANIZATION_ID,\n" +
" SCORE DESC,\n" +
" ENTITY_ID DESC\n" +
" )\n" +
") MULTI_TENANT=TRUE");
}
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',3,'01')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',2,'04')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',2,'03')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1',1,'02')");
conn.commit();
try (Statement stmt = conn.createStatement()) {
// Even though SCORE is descending, the > comparison makes sense logically
// and doesn't have to be reversed as RVC expression comparisons do (which
// is really just a bug - see PHOENIX-3383).
final ResultSet rs = stmt.executeQuery("SELECT entity_id, score\n" +
"FROM " + fullTableName + "\n" +
"WHERE organization_id = 'org1'\n" +
"AND score > 2.0\n" +
"ORDER BY score DESC\n" +
"LIMIT 3");
assertTrue(rs.next());
assertEquals("01", rs.getString(1));
assertEquals(3.0, rs.getDouble(2), 0.001);
assertFalse(rs.next());
}
}
@Test
public void testMutationBatch() throws Exception {
Properties connectionProperties = new Properties();
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "10");
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
String fullTableName = generateUniqueName();
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
" ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
" SCORE DOUBLE NOT NULL,\n" +
" ENTITY_ID CHAR(15) NOT NULL\n" +
" CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
" ORGANIZATION_ID,\n" +
" SCORE DESC,\n" +
" ENTITY_ID DESC\n" +
" )\n" +
") MULTI_TENANT=TRUE");
}
upsertRows(connection, fullTableName);
connection.commit();
assertEquals(2L, connection.getMutationState().getBatchCount());
// set the batch size (rows) to 2 since three are at least 2 mutations when updates a single row
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2");
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
upsertRows(connection, fullTableName);
connection.commit();
// each row should be in its own batch
assertEquals(2L, connection.getMutationState().getBatchCount());
}
private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName +
" (organization_id, entity_id, score) values (?,?,?)");
for (int i = 0; i < 4; i++) {
stmt.setString(1, "AAAA" + i);
stmt.setString(2, "BBBB" + i);
stmt.setInt(3, 1);
stmt.execute();
}
}
@Test public void testRVCWithDescAndAscendingPK() throws Exception {
final Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = generateUniqueName();
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE " + fullTableName + "(\n"
+ " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + " SCORE VARCHAR NOT NULL,\n"
+ " ENTITY_ID VARCHAR NOT NULL\n"
+ " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n"
+ " ORGANIZATION_ID,\n" + " SCORE DESC,\n" + " ENTITY_ID\n"
+ " )\n" + ") MULTI_TENANT=TRUE");
}
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1','c','1')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1','b','3')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1','b','4')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES ('org1','a','2')");
conn.commit();
try (Statement stmt = conn.createStatement()) {
final ResultSet
rs =
stmt.executeQuery("SELECT score, entity_id \n" + "FROM " + fullTableName + "\n"
+ "WHERE organization_id = 'org1'\n"
+ "AND (score, entity_id) < ('b', '4')\n"
+ "ORDER BY score DESC, entity_id\n" + "LIMIT 3");
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("3", rs.getString(2));
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("2", rs.getString(2));
assertFalse(rs.next());
}
}
@Test
public void testRVCOnTenantViewThroughGlobalIdxOrderByDesc() throws Exception {
String fullTableName = generateUniqueName();
String fullViewName = generateUniqueName();
String tenantView = generateUniqueName();
String indexName = generateUniqueName();
// create base table and global view using global connection
try (Connection conn = DriverManager.getConnection(getUrl())) {
Statement stmt = conn.createStatement();
stmt.execute(
"CREATE TABLE " + fullTableName + "(\n"
+ " TENANT_ID CHAR(15) NOT NULL,\n"
+ " KEY_PREFIX CHAR(3) NOT NULL,\n"
+ " CREATED_DATE DATE,\n"
+ " CREATED_BY CHAR(15),\n"
+ " SYSTEM_MODSTAMP DATE\n"
+ " CONSTRAINT PK PRIMARY KEY (\n"
+ " TENANT_ID,"
+ " KEY_PREFIX\n"
+ ")) MULTI_TENANT=TRUE");
stmt.execute("CREATE VIEW " + fullViewName + "(\n"
+ " DATE_TIME1 DATE NOT NULL,\n"
+ " TEXT2 VARCHAR,\n"
+ " DOUBLE1 DECIMAL(12, 3),\n"
+ " IS_BOOLEAN BOOLEAN,\n"
+ " RELATIONSHIP_ID CHAR(15),\n"
+ " TEXT1 VARCHAR,\n"
+ " TEXT_READ_ONLY VARCHAR,\n"
+ " JSON1 VARCHAR,\n"
+ " IP_START_ADDRESS VARCHAR,\n"
+ " CONSTRAINT PKVIEW PRIMARY KEY\n"
+ " (\n"
+ " DATE_TIME1, TEXT2, TEXT1\n"
+ " )) AS SELECT * FROM " + fullTableName
+ " WHERE KEY_PREFIX = '0CY'");
stmt.execute("CREATE INDEX " + indexName + " " + "ON " + fullViewName
+ " (TEXT1 DESC, TEXT2)\n"
+ "INCLUDE (CREATED_BY,\n"
+ " RELATIONSHIP_ID,\n"
+ " JSON1,\n"
+ " DOUBLE1,\n"
+ " IS_BOOLEAN,\n"
+ " IP_START_ADDRESS,\n"
+ " CREATED_DATE,\n"
+ " SYSTEM_MODSTAMP,\n"
+ " TEXT_READ_ONLY)");
}
// create and use an tenant specific view to write data
try (Connection viewConn = DriverManager.getConnection(TENANT_SPECIFIC_URL1)) {
Statement stmt = viewConn.createStatement();
stmt.execute("CREATE VIEW IF NOT EXISTS " + tenantView + " AS SELECT * FROM "
+ fullViewName);
viewConn.createStatement().execute("UPSERT INTO " + tenantView
+ "(DATE_TIME1, TEXT1, TEXT2) "
+ " VALUES (TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 'd', '1')");
viewConn.createStatement().execute("UPSERT INTO " + tenantView
+ "(DATE_TIME1, TEXT1, TEXT2) "
+ " VALUES (TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 'c', '2')");
viewConn.createStatement().execute("UPSERT INTO " + tenantView
+ "(DATE_TIME1, TEXT1, TEXT2) "
+ " VALUES (TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 'b', '3')");
viewConn.createStatement().execute("UPSERT INTO " + tenantView
+ "(DATE_TIME1, TEXT1, TEXT2) "
+ " VALUES (TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 'b', '4')");
viewConn.createStatement().execute("UPSERT INTO " + tenantView
+ "(DATE_TIME1, TEXT1, TEXT2) "
+ " VALUES (TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 'a', '4')");
viewConn.commit();
// query using desc order by so that the index is used
ResultSet
rs =
stmt.executeQuery("SELECT TEXT1, TEXT2 FROM " + tenantView
+ " WHERE (TEXT1, TEXT2) > ('b', '3') ORDER BY TEXT1 DESC, TEXT2");
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("1", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("2", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("4", rs.getString(2));
assertFalse(rs.next());
}
// validate that running query using global view gives same results
try (Connection conn = DriverManager.getConnection(getUrl())) {
ResultSet
rs =
conn.createStatement().executeQuery("SELECT TEXT1, TEXT2 FROM " + fullViewName
+ " WHERE (TEXT1, TEXT2) > ('b', '3') ORDER BY TEXT1 DESC, TEXT2");
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("1", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("2", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("4", rs.getString(2));
assertFalse(rs.next());
}
}
}