blob: f37d09b926baa67433c0e8aa934e397a402c8e86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.tx;
import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.apache.tephra.TxConstants;
import org.junit.Test;
public class TransactionIT extends ParallelStatsDisabledIT {
@Test
public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException {
String tableName = generateUniqueName();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
stmt.execute("DROP TABLE " + tableName);
stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)");
assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional());
assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional());
}
@Test
public void testRowTimestampDisabled() throws SQLException {
String tableName = generateUniqueName();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
try {
stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true");
fail();
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode());
}
stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))");
try {
stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true");
fail();
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode());
}
}
}
@Test
public void testTransactionalTableMetadata() throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String transactTableName = generateUniqueName();
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " +
"TRANSACTIONAL=true");
conn.commit();
DatabaseMetaData dbmd = conn.getMetaData();
ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null);
assertTrue(rs.next());
assertEquals("Transactional table was not marked as transactional in JDBC API.",
"true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
String nonTransactTableName = generateUniqueName();
Statement stmt2 = conn.createStatement();
stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) ");
conn.commit();
ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null);
assertTrue(rs2.next());
assertEquals("Non-transactional table was marked as transactional in JDBC API.",
"false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
}
}
@Test
public void testOnDupKeyForTransactionalTable() throws Exception {
// TODO: we should support having a transactional table defined for a connectionless connection
try (Connection conn = DriverManager.getConnection(getUrl())) {
String transactTableName = generateUniqueName();
conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true");
conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode());
}
}
@Test
public void testProperties() throws Exception {
String nonTxTableName = generateUniqueName();
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "1(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR) TTL=1000");
conn.createStatement().execute("CREATE INDEX idx1 ON " + nonTxTableName + "1(a.v, b.v) TTL=1000");
conn.createStatement().execute("CREATE INDEX idx2 ON " + nonTxTableName + "1(c.v) INCLUDE (a.v, b.v) TTL=1000");
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "1 SET TRANSACTIONAL=true");
HTableDescriptor desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes(nonTxTableName + "1"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
assertEquals(1000, colDesc.getTimeToLive());
assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
}
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
assertEquals(1000, colDesc.getTimeToLive());
assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
}
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
assertEquals(1000, colDesc.getTimeToLive());
assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
}
conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TRANSACTIONAL=true, VERSIONS=10");
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(10, colDesc.getMaxVersions());
assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL));
}
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TTL=1000");
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(10, colDesc.getMaxVersions());
assertEquals(1000, colDesc.getTimeToLive());
assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
}
conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "3 SET TRANSACTIONAL=true, b.VERSIONS=10, c.VERSIONS=20");
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "3"));
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, desc.getFamily(Bytes.toBytes("A")).getMaxVersions());
assertEquals(10, desc.getFamily(Bytes.toBytes("B")).getMaxVersions());
assertEquals(20, desc.getFamily(Bytes.toBytes("C")).getMaxVersions());
conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "4(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
try {
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "4 SET TRANSACTIONAL=true, VERSIONS=1");
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
}
try {
conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "4 SET TRANSACTIONAL=true, b.VERSIONS=1");
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
}
conn.createStatement().execute("CREATE TABLE TX_TABLE1(k INTEGER PRIMARY KEY, v VARCHAR) TTL=1000, TRANSACTIONAL=true");
desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("TX_TABLE1"));
for (HColumnDescriptor colDesc : desc.getFamilies()) {
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
}
}
@Test
public void testColConflicts() throws Exception {
String transTableName = generateUniqueName();
String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
try (Connection conn1 = DriverManager.getConnection(getUrl());
Connection conn2 = DriverManager.getConnection(getUrl())) {
TestUtil.createTransactionalTable(conn1, fullTableName);
conn1.setAutoCommit(false);
conn2.setAutoCommit(false);
String selectSql = "SELECT * FROM "+fullTableName;
conn1.setAutoCommit(false);
ResultSet rs = conn1.createStatement().executeQuery(selectSql);
assertFalse(rs.next());
// upsert row using conn1
String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
PreparedStatement stmt = conn1.prepareStatement(upsertSql);
TestUtil.setRowKeyColumns(stmt, 1);
stmt.setInt(7, 10);
stmt.execute();
// upsert row using conn2
stmt = conn2.prepareStatement(upsertSql);
TestUtil.setRowKeyColumns(stmt, 1);
stmt.setInt(7, 11);
stmt.execute();
conn1.commit();
//second commit should fail
try {
conn2.commit();
fail();
}
catch (SQLException e) {
assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
}
}
}
@Test
public void testCheckpointAndRollback() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String fullTableName = generateUniqueName();
conn.setAutoCommit(false);
try {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
conn.commit();
stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
assertTrue(rs.next());
assertEquals("x", rs.getString(1));
assertEquals("aa", rs.getString(2));
assertEquals("a", rs.getString(3));
assertFalse(rs.next());
stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
assertTrue(rs.next());
assertEquals("x", rs.getString(1));
assertEquals("aaa", rs.getString(2));
assertEquals("a", rs.getString(3));
assertFalse(rs.next());
conn.rollback();
//assert original row exists in fullTableName1
rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
assertTrue(rs.next());
assertEquals("x", rs.getString(1));
assertEquals("a", rs.getString(2));
assertEquals("a", rs.getString(3));
assertFalse(rs.next());
} finally {
conn.close();
}
}
}