| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.math.BigInteger; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Timestamp; |
| import java.util.Iterator; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.util.Repeat; |
| import org.apache.phoenix.util.RunUntilFailure; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.StringUtil; |
| import org.apache.phoenix.util.TestUtil; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| |
| @RunWith(RunUntilFailure.class) |
| public class MutationStateIT extends ParallelStatsDisabledIT { |
| |
| private static final String DDL = |
| " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, " |
| + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK " |
| + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE"; |
| |
| private static final Random RAND = new Random(5); |
| |
| private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { |
| PreparedStatement stmt = |
| conn.prepareStatement("upsert into " + fullTableName |
| + " (organization_id, entity_id, score) values (?,?,?)"); |
| for (int i = 0; i < 10000; i++) { |
| stmt.setString(1, "AAAA" + i); |
| stmt.setString(2, "BBBB" + i); |
| stmt.setInt(3, 1); |
| stmt.execute(); |
| } |
| } |
| |
| |
| public static String randString(int length) { |
| return new BigInteger(164, RAND).toString().substring(0, length); |
| } |
| |
| private static void mutateRandomly(final String upsertStmt, final String fullTableName, |
| final int nThreads, final int nRows, final int nIndexValues, final int batchSize, |
| final CountDownLatch doneSignal) { |
| Runnable[] runnables = new Runnable[nThreads]; |
| for (int i = 0; i < nThreads; i++) { |
| runnables[i] = new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| Connection conn = DriverManager.getConnection(getUrl()); |
| for (int i = 0; i < nRows; i++) { |
| PreparedStatement statement = conn.prepareStatement(upsertStmt); |
| int index = 0; |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(1)); |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(15)); |
| statement.setTimestamp(++index, |
| new Timestamp(System.currentTimeMillis())); |
| statement.setTimestamp(++index, |
| new Timestamp(System.currentTimeMillis())); |
| statement.setString(++index, randString(1)); |
| statement.setString(++index, randString(1)); |
| statement.setBoolean(++index, false); |
| statement.setString(++index, randString(1)); |
| statement.setString(++index, randString(1)); |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(15)); |
| statement.setString(++index, randString(15)); |
| statement.setInt(++index, RAND.nextInt()); |
| statement.execute(); |
| if ((i % batchSize) == 0) { |
| conn.commit(); |
| } |
| } |
| conn.commit(); |
| } catch (SQLException e) { |
| throw new RuntimeException(e); |
| } finally { |
| doneSignal.countDown(); |
| } |
| } |
| |
| }; |
| } |
| for (int i = 0; i < nThreads; i++) { |
| Thread t = new Thread(runnables[i]); |
| t.start(); |
| } |
| } |
| |
| @Test |
| @Repeat(5) |
| public void testOnlyIndexTableWriteFromClientSide() |
| throws SQLException, InterruptedException, IOException { |
| String schemaName = generateUniqueName(); |
| String tableName = generateUniqueName(); |
| String indexName1 = generateUniqueName(); |
| String indexName2 = generateUniqueName(); |
| String indexName3 = generateUniqueName(); |
| String fullTableName = SchemaUtil.getTableName(schemaName, tableName); |
| String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1); |
| String CREATE_DATA_TABLE = |
| "CREATE TABLE IF NOT EXISTS " + fullTableName + " ( \n" |
| + " USER1_ID CHAR(15) NOT NULL,\n" |
| + " ELEMENT1_ID CHAR(15) NOT NULL,\n" |
| + " ELEMENT_ID CHAR(15) NOT NULL,\n" |
| + " ELEMENT_TYPE VARCHAR(1) NOT NULL,\n" |
| + " TYPE_ID CHAR(15) NOT NULL,\n" |
| + " USER_ID CHAR(15) NOT NULL,\n" |
| + " ELEMENT4_TIME TIMESTAMP,\n" |
| + " ELEMENT_UPDATE TIMESTAMP,\n" |
| + " ELEMENT_SCORE DOUBLE,\n" |
| + " ELEMENT2_TYPE VARCHAR(1),\n" |
| + " ELEMENT1_TYPE VARCHAR(1),\n" |
| + " ELEMENT1_IS_SYS_GEN BOOLEAN,\n" |
| + " ELEMENT1_STATUS VARCHAR(1),\n" |
| + " ELEMENT1_VISIBILITY VARCHAR(1),\n" |
| + " ELEMENT3_ID CHAR(15),\n" |
| + " ELEMENT4_BY CHAR(15),\n" |
| + " BEST_ELEMENT_ID CHAR(15),\n" |
| + " ELEMENT_COUNT INTEGER,\n" |
| + " CONSTRAINT PK PRIMARY KEY\n" |
| + " (\n" + " USER1_ID,\n" |
| + " ELEMENT1_ID,\n" |
| + " ELEMENT_ID,\n" |
| + " ELEMENT_TYPE,\n" |
| + " TYPE_ID,\n" |
| + " USER_ID\n" + " )\n" |
| + " ) VERSIONS=1,MULTI_TENANT=TRUE,TTL=31536000\n"; |
| |
| String CREATE_INDEX_1 = |
| "CREATE INDEX IF NOT EXISTS " + indexName1 + " \n" |
| + " ON " + fullTableName + " (\n" |
| + " TYPE_ID,\n" |
| + " ELEMENT_ID,\n" |
| + " ELEMENT_TYPE,\n" |
| + " USER_ID,\n" |
| + " ELEMENT4_TIME DESC,\n" |
| + " ELEMENT1_ID DESC\n" |
| + " ) INCLUDE (\n" |
| + " ELEMENT2_TYPE,\n" |
| + " ELEMENT1_TYPE,\n" |
| + " ELEMENT1_IS_SYS_GEN,\n" |
| + " ELEMENT1_STATUS,\n" |
| + " ELEMENT1_VISIBILITY,\n" |
| + " ELEMENT3_ID,\n" |
| + " ELEMENT4_BY,\n" |
| + " BEST_ELEMENT_ID,\n" |
| + " ELEMENT_COUNT\n" |
| + " )\n"; |
| |
| String CREATE_INDEX_2 = |
| " CREATE INDEX IF NOT EXISTS " + indexName2 + "\n" |
| + " ON " + fullTableName + " (\n" |
| + " TYPE_ID,\n" |
| + " ELEMENT_ID,\n" |
| + " ELEMENT_TYPE,\n" |
| + " USER_ID,\n" |
| + " ELEMENT_UPDATE DESC,\n" |
| + " ELEMENT1_ID DESC\n" |
| + " ) INCLUDE (\n" |
| + " ELEMENT2_TYPE,\n" |
| + " ELEMENT1_TYPE,\n" |
| + " ELEMENT1_IS_SYS_GEN,\n" |
| + " ELEMENT1_STATUS,\n" |
| + " ELEMENT1_VISIBILITY,\n" |
| + " ELEMENT3_ID,\n" |
| + " ELEMENT4_BY,\n" |
| + " BEST_ELEMENT_ID,\n" |
| + " ELEMENT_COUNT\n" |
| + " )\n"; |
| |
| String CREATE_INDEX_3 = |
| "CREATE INDEX IF NOT EXISTS " + indexName3 + "\n" |
| + " ON " + fullTableName + " (\n" |
| + " TYPE_ID,\n" |
| + " ELEMENT_ID,\n" |
| + " ELEMENT_TYPE,\n" |
| + " USER_ID,\n" |
| + " ELEMENT_SCORE DESC,\n" |
| + " ELEMENT1_ID DESC\n" |
| + " ) INCLUDE (\n" |
| + " ELEMENT2_TYPE,\n" |
| + " ELEMENT1_TYPE,\n" |
| + " ELEMENT1_IS_SYS_GEN,\n" |
| + " ELEMENT1_STATUS,\n" |
| + " ELEMENT1_VISIBILITY,\n" |
| + " ELEMENT3_ID,\n" |
| + " ELEMENT4_BY,\n" |
| + " BEST_ELEMENT_ID,\n" |
| + " ELEMENT_COUNT\n" |
| + " )\n"; |
| |
| String UPSERT_INTO_DATA_TABLE = |
| "UPSERT INTO " + fullTableName + "\n" |
| + "(\n" + " USER1_ID,\n" |
| + " ELEMENT1_ID,\n" |
| + " ELEMENT_ID,\n" |
| + " ELEMENT_TYPE,\n" |
| + " TYPE_ID,\n" |
| + " USER_ID,\n" |
| + " ELEMENT4_TIME,\n" |
| + " ELEMENT_UPDATE,\n" |
| + " ELEMENT2_TYPE,\n" |
| + " ELEMENT1_TYPE,\n" |
| + " ELEMENT1_IS_SYS_GEN,\n" |
| + " ELEMENT1_STATUS,\n" |
| + " ELEMENT1_VISIBILITY,\n" |
| + " ELEMENT3_ID,\n" |
| + " ELEMENT4_BY,\n" |
| + " BEST_ELEMENT_ID,\n" |
| + " ELEMENT_COUNT\n" + ")" |
| + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; |
| |
| int nThreads = 1; |
| int nRows = 5000; |
| int nIndexValues = 4000; |
| int batchSize = 200; |
| final CountDownLatch doneSignal = new CountDownLatch(nThreads); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| try { |
| conn.createStatement().execute(CREATE_DATA_TABLE); |
| conn.createStatement().execute(CREATE_INDEX_1); |
| conn.createStatement().execute(CREATE_INDEX_2); |
| conn.createStatement().execute(CREATE_INDEX_3); |
| conn.commit(); |
| mutateRandomly(UPSERT_INTO_DATA_TABLE, fullTableName, nThreads, nRows, nIndexValues, |
| batchSize, doneSignal); |
| Thread.sleep(200); |
| unassignRegionAsync(fullIndexName1); |
| assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); |
| } finally { |
| |
| } |
| long dataTableRows = TestUtil.getRowCount(conn, fullTableName); |
| ResultSet rs = |
| conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), null, |
| new String[] { PTableType.INDEX.toString() }); |
| while (rs.next()) { |
| String indexState = rs.getString("INDEX_STATE"); |
| String indexName = rs.getString(3); |
| long rowCountIndex = |
| TestUtil.getRowCount(conn, SchemaUtil.getTableName(schemaName, indexName)); |
| if (indexState.equals(PIndexState.ACTIVE.name())) { |
| assertTrue(dataTableRows == rowCountIndex); |
| } else { |
| assertTrue(dataTableRows > rowCountIndex); |
| } |
| } |
| } catch (InterruptedException e) { |
| throw e; |
| } catch (IOException e) { |
| throw e; |
| } |
| } |
| |
| @Test |
| public void testDeleteMaxMutationSize() throws SQLException { |
| String tableName = generateUniqueName(); |
| int NUMBER_OF_ROWS = 20; |
| String ddl = "CREATE TABLE " + tableName + " (V BIGINT PRIMARY KEY, K BIGINT)"; |
| PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); |
| conn.createStatement().execute(ddl); |
| |
| for(int i = 0; i < NUMBER_OF_ROWS; i++) { |
| conn.createStatement().execute( |
| "UPSERT INTO " + tableName + " VALUES (" + i + ", "+ i + ")"); |
| conn.commit(); |
| } |
| |
| Properties props = new Properties(); |
| props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, |
| String.valueOf(NUMBER_OF_ROWS / 2)); |
| PhoenixConnection connection = |
| (PhoenixConnection) DriverManager.getConnection(getUrl(), props); |
| connection.setAutoCommit(false); |
| |
| try { |
| for(int i = 0; i < NUMBER_OF_ROWS; i++) { |
| connection.createStatement().execute( |
| "DELETE FROM " + tableName + " WHERE K = " + i ); |
| } |
| } catch (SQLException e) { |
| assertTrue(e.getMessage().contains( |
| SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getMessage())); |
| } |
| |
| props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "10"); |
| props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "10000"); |
| connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), props); |
| connection.setAutoCommit(false); |
| |
| try { |
| connection.createStatement().execute("DELETE FROM " + tableName ); |
| } catch (SQLException e) { |
| assertTrue(e.getMessage().contains( |
| SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getMessage())); |
| } |
| } |
| |
| @Test |
| public void testUpsertMaxMutationSize() throws Exception { |
| Properties connectionProperties = new Properties(); |
| connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); |
| connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); |
| PhoenixConnection connection = |
| (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); |
| String fullTableName = generateUniqueName(); |
| try (Statement stmt = connection.createStatement()) { |
| stmt.execute( |
| "CREATE TABLE " + fullTableName + DDL); |
| } |
| try { |
| upsertRows(connection, fullTableName); |
| fail(); |
| } catch (SQLException e) { |
| assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), |
| e.getErrorCode()); |
| assertTrue(e.getMessage().contains( |
| SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getMessage())); |
| } |
| |
| // set the max mutation size (bytes) to a low value |
| connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); |
| connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); |
| connection = |
| (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); |
| try { |
| upsertRows(connection, fullTableName); |
| fail(); |
| } catch (SQLException e) { |
| assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), |
| e.getErrorCode()); |
| assertTrue(e.getMessage().contains( |
| SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getMessage())); |
| } |
| } |
| |
| @Test |
| public void testMutationEstimatedSize() throws Exception { |
| PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); |
| conn.setAutoCommit(false); |
| String fullTableName = generateUniqueName(); |
| try (Statement stmt = conn.createStatement()) { |
| stmt.execute( |
| "CREATE TABLE " + fullTableName + DDL); |
| } |
| |
| // upserting rows should increase the mutation state size |
| MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); |
| long prevEstimatedSize = state.getEstimatedSize(); |
| upsertRows(conn, fullTableName); |
| assertTrue("Mutation state size should have increased", |
| state.getEstimatedSize() > prevEstimatedSize); |
| |
| |
| // after commit or rollback the size should be zero |
| conn.commit(); |
| assertEquals("Mutation state size should be zero after commit", 0, |
| state.getEstimatedSize()); |
| upsertRows(conn, fullTableName); |
| conn.rollback(); |
| assertEquals("Mutation state size should be zero after rollback", 0, |
| state.getEstimatedSize()); |
| |
| // upsert one row |
| PreparedStatement stmt = |
| conn.prepareStatement("upsert into " + fullTableName |
| + " (organization_id, entity_id, score) values (?,?,?)"); |
| stmt.setString(1, "ZZZZ"); |
| stmt.setString(2, "YYYY"); |
| stmt.setInt(3, 1); |
| stmt.execute(); |
| assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0); |
| |
| prevEstimatedSize = state.getEstimatedSize(); |
| // upserting the same row twice should not increase the size |
| stmt.setString(1, "ZZZZ"); |
| stmt.setString(2, "YYYY"); |
| stmt.setInt(3, 1); |
| stmt.execute(); |
| assertEquals( |
| "Mutation state size should only increase 4 bytes (size of the new statement index)", |
| prevEstimatedSize + 4, state.getEstimatedSize()); |
| |
| prevEstimatedSize = state.getEstimatedSize(); |
| // changing the value of one column of a row to a larger value should increase the estimated size |
| stmt = |
| conn.prepareStatement("upsert into " + fullTableName |
| + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); |
| stmt.setString(1, "ZZZZ"); |
| stmt.setString(2, "YYYY"); |
| stmt.setInt(3, 1); |
| stmt.setString(4, "random text string random text string random text string"); |
| stmt.execute(); |
| assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize()); |
| |
| prevEstimatedSize = state.getEstimatedSize(); |
| // changing the value of one column of a row to a smaller value should decrease the estimated size |
| stmt = |
| conn.prepareStatement("upsert into " + fullTableName |
| + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); |
| stmt.setString(1, "ZZZZ"); |
| stmt.setString(2, "YYYY"); |
| stmt.setInt(3, 1); |
| stmt.setString(4, ""); |
| stmt.execute(); |
| assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize()); |
| } |
| |
| @Test |
| public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception { |
| String tableName = "TBL_" + generateUniqueName(); |
| String indexName = "IDX_" + generateUniqueName(); |
| Properties props = new Properties(); |
| props.put("phoenix.mutate.batchSize", "2"); |
| try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) { |
| conn.setAutoCommit(false); |
| conn.createStatement().executeUpdate( |
| "CREATE TABLE " + tableName + " (" |
| + "A VARCHAR NOT NULL PRIMARY KEY," |
| + "B VARCHAR," |
| + "C VARCHAR," |
| + "D VARCHAR) COLUMN_ENCODED_BYTES = 0"); |
| conn.createStatement().executeUpdate("CREATE INDEX " + indexName + " on " + tableName + " (C) INCLUDE(D)"); |
| |
| conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A2','B2','C2','D2')"); |
| conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A3','B3', 'C3', null)"); |
| conn.commit(); |
| |
| Table htable = conn.getQueryServices().getTable(Bytes.toBytes(tableName)); |
| Scan scan = new Scan(); |
| scan.setRaw(true); |
| Iterator<Result> scannerIter = htable.getScanner(scan).iterator(); |
| while (scannerIter.hasNext()) { |
| long ts = -1; |
| Result r = scannerIter.next(); |
| for (Cell cell : r.listCells()) { |
| if (ts == -1) { |
| ts = cell.getTimestamp(); |
| } else { |
| assertEquals("(" + cell.toString() + ") has different ts", ts, cell.getTimestamp()); |
| } |
| } |
| } |
| htable.close(); |
| } |
| } |
| |
| } |