blob: badc2c5e079407c75fd375b65723abd582b34738 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@Category(ParallelStatsDisabledTest.class)
@RunWith(Parameterized.class)
public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
private final String indexDDL;
public OnDuplicateKeyIT(String indexDDL) {
this.indexDDL = indexDDL;
}
@Parameters
public static synchronized Collection<Object> data() {
List<Object> testCases = Lists.newArrayList();
testCases.add(new String[] {
"",
});
testCases.add(new String[] {
"create local index %s_IDX on %s(counter1) include (counter2)",
});
testCases.add(new String[] {
"create local index %s_IDX on %s(counter1, counter2)",
});
testCases.add(new String[] {
"create index %s_IDX on %s(counter1) include (counter2)",
});
testCases.add(new String[] {
"create index %s_IDX on %s(counter1, counter2)",
});
testCases.add(new String[] {
"create uncovered index %s_IDX on %s(counter1)",
});
testCases.add(new String[] {
"create uncovered index %s_IDX on %s(counter1, counter2)",
});
return testCases;
}
private void createIndex(Connection conn, String tableName) throws SQLException {
if (indexDDL == null || indexDDL.length() == 0) {
return;
}
String ddl = String.format(indexDDL, tableName, tableName);
conn.createStatement().execute(ddl);
}
@Test
public void testNewAndUpdateOnSingleNumericColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 smallint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(0,rs.getLong(2));
assertFalse(rs.next());
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(1,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testNewAndUpdateOnSingleNumericColumnWithOtherColumns() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(k1 varchar, k2 varchar, counter1 varchar, counter2 date, other1 char(3), other2 varchar default 'f', constraint pk primary key (k1,k2))";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " +
"ON DUPLICATE KEY UPDATE counter1 = counter1 || CASE WHEN LENGTH(counter1) < 10 THEN 'SMALL' ELSE 'LARGE' END || k2 || other2 || other1 ";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("b",rs.getString(2));
assertEquals("c",rs.getString(3));
assertEquals(null,rs.getDate(4));
assertEquals("eee",rs.getString(5));
assertEquals("f",rs.getString(6));
assertFalse(rs.next());
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("b",rs.getString(2));
assertEquals("cSMALLbfeee",rs.getString(3));
assertEquals(null,rs.getDate(4));
assertEquals("eee",rs.getString(5));
assertEquals("f",rs.getString(6));
assertFalse(rs.next());
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("b",rs.getString(2));
assertEquals("cSMALLbfeeeLARGEbfeee",rs.getString(3));
assertEquals(null,rs.getDate(4));
assertEquals("eee",rs.getString(5));
assertEquals("f",rs.getString(6));
assertFalse(rs.next());
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " +
"ON DUPLICATE KEY UPDATE counter1 = to_char(rand()), counter2 = current_date() + 1");
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("b",rs.getString(2));
double d = Double.parseDouble(rs.getString(3));
assertTrue(d >= 0.0 && d <= 1.0);
Date date = rs.getDate(4);
assertTrue(date.after(new Date(System.currentTimeMillis())));
assertEquals("eee",rs.getString(5));
assertEquals("f",rs.getString(6));
assertFalse(rs.next());
conn.close();
}
@Test
public void testNewAndUpdateOnSingleVarcharColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = counter1 || 'b'";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("b",rs.getString(2));
assertFalse(rs.next());
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("bb",rs.getString(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testDeleteOnSingleVarcharColumnAutoCommit() throws Exception {
testDeleteOnSingleVarcharColumn(true);
}
@Test
public void testDeleteOnSingleVarcharColumnNoAutoCommit() throws Exception {
testDeleteOnSingleVarcharColumn(false);
}
private void testDeleteOnSingleVarcharColumn(boolean autoCommit) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(autoCommit);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = null";
conn.createStatement().execute(dml);
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(null,rs.getString(2));
assertFalse(rs.next());
dml = "UPSERT INTO " + tableName + " VALUES('a','b',0)";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = null, counter2 = counter2 + 1";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = 'c', counter2 = counter2 + 1";
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("c",rs.getString(2));
assertEquals(2,rs.getInt(3));
assertFalse(rs.next());
conn.close();
}
@Test
public void testIgnoreOnSingleColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(10,rs.getLong(2));
assertFalse(rs.next());
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE");
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(10,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testInitialIgnoreWithUpdateOnSingleColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
// Test ignore combined with update in same commit batch for new record
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY IGNORE");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(11,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testOverrideOnDupKeyUpdateWithUpsert() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
// Test upsert overriding ON DUPLICATE KEY entries
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',2) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(10,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testNewAndMultiUpdateOnSingleColumnAutoCommit() throws Exception {
testNewAndMultiUpdateOnSingleColumn(true);
}
@Test
public void testNewAndMultiUpdateOnSingleColumnNoAutoCommit() throws Exception {
testNewAndMultiUpdateOnSingleColumn(false);
}
private void testNewAndMultiUpdateOnSingleColumn(boolean autoCommit) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(autoCommit);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 integer)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',5) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); // no impact
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(2,rs.getLong(2));
assertFalse(rs.next());
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(9,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testNewAndMultiDifferentUpdateOnSingleColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 decimal)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(3,rs.getLong(2));
assertFalse(rs.next());
conn.close();
}
@Test
public void testNewAndMultiDifferentUpdateOnMultipleColumnsAutoCommit() throws Exception {
testNewAndMultiDifferentUpdateOnMultipleColumns(true);
}
@Test
public void testNewAndMultiDifferentUpdateOnMultipleColumnsNoAutoCommit() throws Exception {
testNewAndMultiDifferentUpdateOnMultipleColumns(false);
}
private void testNewAndMultiDifferentUpdateOnMultipleColumns(boolean autoCommit) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(autoCommit);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 tinyint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " VALUES('a',0,0) ON DUPLICATE KEY UPDATE counter1 = counter2 + 1, counter2 = counter1 + 2";
conn.createStatement().execute(dml);
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(1,rs.getLong(2));
assertEquals(2,rs.getLong(3));
assertFalse(rs.next());
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(1,rs.getLong(2));
assertEquals(2,rs.getLong(3));
assertFalse(rs.next());
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(3,rs.getLong(2));
assertEquals(3,rs.getLong(3));
assertFalse(rs.next());
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(3,rs.getLong(2));
assertEquals(3,rs.getLong(3));
assertFalse(rs.next());
conn.close();
}
@Test
public void testAtomicUpdate() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
final String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, counter1 integer, counter2 integer)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
int nThreads = 10;
final int[] resultHolder = new int[1];
final int nCommits = 100;
final int nIncrementsPerCommit = 2;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
List<Future> futures = Lists.newArrayListWithExpectedSize(nThreads);
Connection[] connections = new Connection[nThreads];
for (int i = 0; i < nThreads; i++) {
connections[i] = DriverManager.getConnection(getUrl(), props);
}
for (int i = 0; i < nThreads; i++) {
final Connection myConn = connections[i];
futures.add(exec.submit(new Runnable() {
@Override
public void run() {
String dml = "UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
try {
for (int j = 0; j < nCommits; j++) {
for (int k = 0; k < nIncrementsPerCommit; k++) {
myConn.createStatement().execute(dml);
resultHolder[0]++;
}
myConn.commit();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}));
}
Collections.shuffle(futures);
for (Future future : futures) {
future.get();
}
exec.shutdownNow();
int finalResult = nThreads * nCommits * nIncrementsPerCommit;
boolean isIndexCreated = this.indexDDL != null && this.indexDDL.length() > 0;
ResultSet rs;
String selectSql = "SELECT * FROM " + tableName + " WHERE counter1 >= 0";
if (isIndexCreated) {
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
IndexToolIT.assertExplainPlan(this.indexDDL.contains("local"), actualExplainPlan,
tableName, tableName + "_IDX");
}
rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(finalResult,rs.getInt(2));
assertFalse(rs.next());
if (isIndexCreated) {
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals(finalResult, rs.getInt(2));
assertFalse(rs.next());
}
conn.close();
}
@Test
public void testDeleteOnSingleLowerCaseVarcharColumn() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
String tableName = generateUniqueName();
String ddl = " create table " + tableName + "(pk varchar primary key, \"counter1\" varchar, \"counter2\" smallint)";
conn.createStatement().execute(ddl);
String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE \"counter1\" = null";
conn.createStatement().execute(dml);
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(null,rs.getString(2));
assertFalse(rs.next());
dml = "UPSERT INTO " + tableName + " VALUES('a','b',0)";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE \"counter1\" = null, \"counter2\" = \"counter2\" + 1";
conn.createStatement().execute(dml);
dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE \"counter1\" = 'c', \"counter2\" = \"counter2\" + 1";
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals("c",rs.getString(2));
assertEquals(2,rs.getInt(3));
assertFalse(rs.next());
conn.close();
}
@Test
public void testDuplicateUpdateWithSaltedTable() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
final Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
try {
String ddl = "create table " + tableName + " (id varchar not null,id1 varchar not null, counter1 bigint, counter2 bigint CONSTRAINT pk PRIMARY KEY (id,id1)) SALT_BUCKETS=6";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml = "UPSERT INTO " + tableName + " (id,id1, counter1, counter2) VALUES ('abc','123', 0, 0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1, counter2 = counter2 + 1";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("0",rs.getString(3));
assertEquals("0",rs.getString(4));
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals("1",rs.getString(3));
assertEquals("1",rs.getString(4));
} catch (Exception e) {
fail();
} finally {
conn.close();
}
}
@Test
public void testRowsCreatedViaUpsertOnDuplicateKeyShouldNotBeReturnedInQueryIfNotMatched() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 smallint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
// The data has to be specifically starting with null for the first counter to fail the test. If you reverse the values, the test passes.
String dml1 = "UPSERT INTO " + tableName + " VALUES('a',NULL,2) ON DUPLICATE KEY UPDATE " +
"counter1 = CASE WHEN (counter1 IS NULL) THEN NULL ELSE counter1 END, " +
"counter2 = CASE WHEN (counter1 IS NULL) THEN 2 ELSE counter2 END";
conn.createStatement().execute(dml1);
conn.commit();
String dml2 = "UPSERT INTO " + tableName + " VALUES('b',1,2) ON DUPLICATE KEY UPDATE " +
"counter1 = CASE WHEN (counter1 IS NULL) THEN 1 ELSE counter1 END, " +
"counter2 = CASE WHEN (counter1 IS NULL) THEN 2 ELSE counter2 END";
conn.createStatement().execute(dml2);
conn.commit();
// Using this statement causes the test to pass
//ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter2 = 2 AND counter1 = 1");
// This statement should be equivalent to the one above, but it selects both rows.
ResultSet rs = conn.createStatement().executeQuery("SELECT pk, counter1, counter2 FROM " + tableName + " WHERE counter2 = 2 AND (counter1 = 1 OR counter1 = 1)");
assertTrue(rs.next());
assertEquals("b",rs.getString(1));
assertEquals(1,rs.getLong(2));
assertEquals(2,rs.getLong(3));
assertFalse(rs.next());
conn.close();
}
@Test
public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 varchar)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
// row doesn't exist
conn.createStatement().execute(String.format("UPSERT INTO %s VALUES('a',0,'abc')", tableName));
conn.createStatement().execute(String.format(
"UPSERT INTO %s VALUES('a',1,'zzz') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName));
conn.commit();
assertRow(conn, tableName, "a", 2, "abc");
// row exists
conn.createStatement().execute(String.format("UPSERT INTO %s VALUES('a', 7, 'fff')", tableName));
conn.createStatement().execute(String.format(
"UPSERT INTO %s VALUES('a',1, 'bazz') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2," +
"counter2 = counter2 || 'ddd'", tableName));
conn.commit();
assertRow(conn, tableName, "a", 9, "fffddd");
// partial update
conn.createStatement().execute(String.format(
"UPSERT INTO %s (pk, counter2) VALUES('a', 'gggg') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName));
conn.createStatement().execute(String.format(
"UPSERT INTO %s (pk, counter2) VALUES ('a', 'bar')", tableName));
conn.commit();
assertRow(conn, tableName, "a", 11, "bar");
}
}
@Test
public void testMultiplePartialUpdatesInSameBatch() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml;
ResultSet rs;
// first commit
dml = String.format("UPSERT INTO %s VALUES('a',0,0)", tableName);
conn.createStatement().execute(dml);
conn.commit();
// batch multiple conditional updates (partial) in a single batch
dml = String.format(
"UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
conn.createStatement().execute(dml);
dml = String.format(
"UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 2", tableName);
conn.createStatement().execute(dml);
dml = String.format(
"UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 100", tableName);
conn.createStatement().execute(dml);
dml = String.format(
"UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 200", tableName);
conn.createStatement().execute(dml);
conn.commit();
String dql = String.format("SELECT counter1, counter2 FROM %s WHERE counter1 > 0", tableName);
rs = conn.createStatement().executeQuery(dql);
assertTrue(rs.next());
assertEquals(101, rs.getInt(1));
assertEquals(202, rs.getInt(2));
}
}
@Test
public void testComplexDuplicateKeyExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "create table " + tableName +
"(pk varchar primary key, counter1 bigint, counter2 bigint, approval varchar)";
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
String dml;
dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100, 'NONE')", tableName);
conn.createStatement().execute(dml);
conn.commit();
dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES ('abc', 0, 10) " +
"ON DUPLICATE KEY UPDATE " +
"counter1 = counter1 + counter2," +
"approval = CASE WHEN counter1 < 100 THEN 'NONE' " +
"WHEN counter1 < 1000 THEN 'MANAGER_APPROVAL' " +
"ELSE 'VP_APPROVAL' END", tableName);
conn.createStatement().execute(dml);
conn.commit();
String dql = "SELECT * from " + tableName;
ResultSet rs = conn.createStatement().executeQuery(dql);
assertTrue(rs.next());
assertEquals("abc", rs.getString("pk"));
assertEquals(100, rs.getInt("counter1"));
assertEquals(100, rs.getInt("counter2"));
assertEquals("NONE", rs.getString("approval"));
conn.createStatement().execute(dml);
conn.commit();
rs = conn.createStatement().executeQuery(dql);
assertTrue(rs.next());
assertEquals("abc", rs.getString("pk"));
assertEquals(200, rs.getInt("counter1"));
assertEquals(100, rs.getInt("counter2"));
assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
}
}
@Test
public void testRowStampCol() throws Exception {
// ROW_TIMESTAMP is not supported for tables with indexes
if (indexDDL.length() > 0) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "create table " + tableName +
"(\n" +
"ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
"USER_ID CHAR(15) NOT NULL,\n" +
"TIME_STAMP DATE NOT NULL,\n" +
"STATUS VARCHAR,\n" +
"CONSTRAINT PK PRIMARY KEY \n" +
" (\n" +
" ORGANIZATION_ID, \n" +
" USER_ID,\n" +
" TIME_STAMP ROW_TIMESTAMP\n" + // ROW_TIMESTAMP col
" ) \n" +
")\n";
conn.createStatement().execute(ddl);
String orgid = "ORG1";
String userid = "USER1";
String original = "ORIGINAL";
String updated = "UPDATED";
String duplicate = "DUPLICATE";
long rowTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 10;
String dml = "UPSERT INTO " + tableName +
"(ORGANIZATION_ID, USER_ID, TIME_STAMP, STATUS) VALUES (?, ?, ?, ?)";
String ignoreDml = dml + "ON DUPLICATE KEY IGNORE";
String updateDml = dml + "ON DUPLICATE KEY UPDATE status='" + duplicate + "'";
String nullDml = dml + "ON DUPLICATE KEY UPDATE status = null";
String dql = "SELECT count(*) from " + tableName + " WHERE STATUS = ?";
// row doesn't exist
upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, original);
assertNumRecords(1, conn, dql, original);
assertHBaseRowTimestamp(tableName, rowTimestamp);
// on duplicate key ignore
upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, updated);
assertNumRecords(1, conn, dql, original);
assertNumRecords(0, conn, dql, updated);
assertHBaseRowTimestamp(tableName, rowTimestamp);
// regular upsert override
upsertRecord(conn, dml, orgid, userid, rowTimestamp, updated);
assertNumRecords(0, conn, dql, original);
assertNumRecords(1, conn, dql, updated);
assertHBaseRowTimestamp(tableName, rowTimestamp);
// on duplicate key update generates extra mutations on the server but those mutations
// don't honor ROW_TIMESTAMP
upsertRecord(conn, updateDml, orgid, userid, rowTimestamp, "");
assertNumRecords(0, conn, dql, updated);
assertNumRecords(1, conn, dql, duplicate);
// set null, new mutations generated on the server
upsertRecord(conn, nullDml, orgid, userid, rowTimestamp, "");
assertNumRecords(0, conn, dql, duplicate);
dql = "SELECT count(*) from " + tableName + " WHERE STATUS is null";
assertNumRecords(1, conn, dql);
}
}
private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(expectedPK,rs.getString(1));
assertEquals(expectedCol1,rs.getInt(2));
assertEquals(expectedCol2,rs.getString(3));
assertFalse(rs.next());
}
private void upsertRecord(Connection conn, String dml, String orgid, String userid, long ts, String status) throws SQLException {
try(PreparedStatement stmt = conn.prepareStatement(dml)) { // regular upsert
stmt.setString(1, orgid);
stmt.setString(2, userid);
stmt.setDate(3, new Date(ts));
stmt.setString(4, status); // status should change now
stmt.executeUpdate();
conn.commit();
}
}
private void assertNumRecords(int count, Connection conn, String dql, String... params)
throws Exception {
PreparedStatement stmt = conn.prepareStatement(dql);
int counter = 1;
for (String param : params) {
stmt.setString(counter++, param);
}
ResultSet rs = stmt.executeQuery();
assertTrue(rs.next());
assertEquals(count, rs.getInt(1));
}
private void assertHBaseRowTimestamp(String tableName, long expectedTimestamp) throws Exception {
Scan scan = new Scan();
byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
try (org.apache.hadoop.hbase.client.Connection hconn =
ConnectionFactory.createConnection(config)) {
Table table = hconn.getTable(TableName.valueOf(tableName));
ResultScanner resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
long actualTimestamp = result.getColumnLatestCell(
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
assertEquals(expectedTimestamp, actualTimestamp);
}
}
}