| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.phoenix; |
| |
| import static org.junit.Assert.assertFalse; |
| |
| import java.io.FileInputStream; |
| import java.io.InputStreamReader; |
| import java.io.Reader; |
| import java.math.BigDecimal; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Paths; |
| import java.sql.Array; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.Statement; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.drill.exec.store.StoragePluginRegistry; |
| import com.google.common.collect.Maps; |
| import org.apache.drill.test.ClusterFixture; |
| import org.apache.drill.test.ClusterFixtureBuilder; |
| import org.apache.drill.test.ClusterTest; |
| import org.apache.hadoop.fs.Path; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.slf4j.LoggerFactory; |
| |
| import com.univocity.parsers.csv.CsvParser; |
| import com.univocity.parsers.csv.CsvParserSettings; |
| |
| public class PhoenixBaseTest extends ClusterTest { |
| |
| private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class); |
| |
| public final static String U_U_I_D = UUID.randomUUID().toString(); |
| private final static AtomicInteger initCount = new AtomicInteger(0); |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| TimeZone.setDefault(TimeZone.getTimeZone("UTC")); |
| PhoenixTestSuite.initPhoenix(); |
| if (PhoenixTestSuite.isRunningSuite()) { |
| PhoenixBasicsIT.testCatalogs(); |
| } |
| startDrillCluster(); |
| if (initCount.incrementAndGet() == 1) { |
| createSchema(PhoenixBasicsIT.CONN_STRING); |
| createTables(PhoenixBasicsIT.CONN_STRING); |
| createSampleData(PhoenixBasicsIT.CONN_STRING); |
| } |
| } |
| |
| @AfterClass |
| public static void tearDownCluster() throws Exception { |
| if (!PhoenixTestSuite.isRunningSuite()) { |
| PhoenixTestSuite.tearDownCluster(); |
| } |
| } |
| |
| public static void startDrillCluster() throws Exception { |
| ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); |
| startCluster(builder); |
| Map<String, Object> props = Maps.newHashMap(); |
| props.put("phoenix.query.timeoutMs", 90000); |
| props.put("phoenix.query.keepAliveMs", "30000"); |
| StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage(); |
| PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null, null, |
| PhoenixBasicsIT.CONN_STRING, null, props); |
| config.setEnabled(true); |
| registry.put(PhoenixStoragePluginConfig.NAME + "123", config); |
| dirTestWatcher.copyResourceToRoot(Paths.get("")); |
| } |
| |
| public static void createSchema(String connString) throws Exception { |
| try (final Connection connection = DriverManager.getConnection(connString)) { |
| logger.debug("Phoenix connection established with the specified url : {}", connString); |
| assertFalse(connection.isClosed()); |
| connection.setAutoCommit(true); |
| try (final Statement stmt = connection.createStatement()) { |
| assertFalse(stmt.execute("CREATE SCHEMA IF NOT EXISTS V1")); |
| } |
| } |
| } |
| |
| public static void createTables(String connString) throws Exception { |
| try (final Connection connection = DriverManager.getConnection(connString)) { |
| assertFalse(connection.isClosed()); |
| connection.setAutoCommit(true); |
| try (final Statement stmt = connection.createStatement()) { |
| String region_sql = " CREATE TABLE V1.REGION " |
| + "(" |
| + " R_REGIONKEY BIGINT not null," |
| + " R_NAME VARCHAR," |
| + " R_COMMENT VARCHAR" |
| + " CONSTRAINT REGION_PK PRIMARY KEY (R_REGIONKEY)" |
| + ")"; |
| String nation_sql = " CREATE TABLE V1.NATION " |
| + "(" |
| + " N_NATIONKEY BIGINT not null primary key," |
| + " N_NAME VARCHAR(100)," |
| + " N_REGIONKEY BIGINT," |
| + " N_COMMENT VARCHAR(255)" |
| + ")"; |
| String datatype_sql = " CREATE TABLE V1.DATATYPE " |
| + "(" |
| + " T_UUID VARCHAR not null primary key," |
| + " T_VARCHAR VARCHAR," |
| + " T_CHAR CHAR(5)," |
| + " T_BIGINT BIGINT," |
| + " T_INTEGER INTEGER," |
| + " T_SMALLINT SMALLINT," |
| + " T_TINYINT TINYINT," |
| + " T_DOUBLE DOUBLE," |
| + " T_FLOAT FLOAT," |
| + " T_DECIMAL DECIMAL(4,2)," |
| + " T_DATE DATE," |
| + " T_TIME TIME," |
| + " T_TIMESTAMP TIMESTAMP," |
| + " T_BINARY BINARY(10)," |
| + " T_VARBINARY VARBINARY," |
| + " T_BOOLEAN BOOLEAN" |
| + ")"; |
| String arrytype_sql = " CREATE TABLE V1.ARRAYTYPE " |
| + "(" |
| + " T_UUID VARCHAR not null primary key," |
| + " T_VARCHAR VARCHAR ARRAY," |
| + " T_CHAR CHAR(5) ARRAY," |
| + " T_BIGINT BIGINT ARRAY," |
| + " T_INTEGER INTEGER ARRAY," |
| + " T_DOUBLE DOUBLE ARRAY," |
| + " T_SMALLINT SMALLINT ARRAY," |
| + " T_TINYINT TINYINT ARRAY," |
| + " T_BOOLEAN BOOLEAN ARRAY" |
| + ")"; |
| assertFalse(stmt.execute(region_sql)); |
| assertFalse(stmt.execute(nation_sql)); |
| assertFalse(stmt.execute(datatype_sql)); |
| assertFalse(stmt.execute(arrytype_sql)); |
| } |
| } |
| } |
| |
| public static void createSampleData(String connString) throws Exception { |
| final String[] paths = new String[] { "data/region.tbl", "data/nation.tbl" }; |
| final String[] sqls = new String[] { |
| "UPSERT INTO V1.REGION VALUES(?,?,?)", |
| "UPSERT INTO V1.NATION VALUES(?,?,?,?)", |
| "UPSERT INTO V1.DATATYPE VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", |
| "UPSERT INTO V1.ARRAYTYPE VALUES(?,?,ARRAY['a','b','c'],?,?,?,?,?,?)", |
| }; |
| |
| Path region_path = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), paths[0]); |
| Path nation_path = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), paths[1]); |
| logger.info("Loading the .tbl file : " + Arrays.toString(paths)); |
| |
| List<String[]> allRows = parseTblFile(String.valueOf(region_path)); |
| try (final Connection connection = DriverManager.getConnection(connString)) { |
| assertFalse(connection.isClosed()); |
| connection.setAutoCommit(false); |
| // region table |
| try (final PreparedStatement pstmt = connection.prepareStatement(sqls[0])) { |
| for (String[] row : allRows) { |
| pstmt.setLong(1, Long.valueOf(row[0])); |
| pstmt.setString(2, row[1]); |
| pstmt.setString(3, row[2]); |
| pstmt.addBatch(); |
| } |
| pstmt.executeBatch(); |
| } |
| connection.commit(); |
| // nation table |
| allRows = parseTblFile(String.valueOf(nation_path)); |
| try (final PreparedStatement pstmt = connection.prepareStatement(sqls[1])) { |
| for (String[] row : allRows) { |
| pstmt.setLong(1, Long.valueOf(row[0])); |
| pstmt.setString(2, row[1]); |
| pstmt.setLong(3, Long.valueOf(row[2])); |
| pstmt.setString(4, row[3]); |
| pstmt.addBatch(); |
| } |
| pstmt.executeBatch(); |
| } |
| connection.commit(); |
| // datatype table |
| try (final PreparedStatement pstmt = connection.prepareStatement(sqls[2])) { |
| pstmt.setString(1, U_U_I_D); |
| pstmt.setString(2, "apache"); |
| pstmt.setString(3, "drill"); |
| pstmt.setLong(4, Long.MAX_VALUE); |
| pstmt.setInt(5, Integer.MAX_VALUE); |
| pstmt.setShort(6, Short.MAX_VALUE); |
| pstmt.setByte(7, Byte.MAX_VALUE); |
| pstmt.setDouble(8, Double.MAX_VALUE); |
| pstmt.setFloat(9, Float.MAX_VALUE); |
| pstmt.setBigDecimal(10, BigDecimal.valueOf(10.11)); |
| pstmt.setDate(11, java.sql.Date.valueOf("2021-12-12")); |
| pstmt.setTime(12, java.sql.Time.valueOf("12:12:12")); |
| pstmt.setTimestamp(13, java.sql.Timestamp.valueOf("2021-12-12 12:12:12")); |
| pstmt.setBytes(14, "a_b_c_d_e_".getBytes(StandardCharsets.UTF_8)); |
| pstmt.setBytes(15, "12345".getBytes(StandardCharsets.UTF_8)); |
| pstmt.setBoolean(16, Boolean.TRUE); |
| pstmt.addBatch(); |
| pstmt.executeBatch(); |
| } |
| connection.commit(); |
| // arraytype table |
| try (final PreparedStatement pstmt = connection.prepareStatement(sqls[3])) { |
| Array t_varchar = connection.createArrayOf("VARCHAR", new String[] { "apache", "drill", "1.20" }); |
| @SuppressWarnings("unused") |
| Array t_char = connection.createArrayOf("CHAR", new String[] { "a", "b", "c" }); // PHOENIX-6607 |
| Array t_bigint = connection.createArrayOf("BIGINT", new Long[] { Long.MIN_VALUE, Long.MAX_VALUE }); |
| Array t_integer = connection.createArrayOf("INTEGER", new Integer[] { Integer.MIN_VALUE, Integer.MAX_VALUE }); |
| Array t_double = connection.createArrayOf("DOUBLE", new Double[] { Double.MIN_VALUE, Double.MAX_VALUE }); |
| @SuppressWarnings("unused") |
| Array t_float = connection.createArrayOf("FLOAT", new Float[] { Float.MIN_VALUE, Float.MAX_VALUE }); // PHOENIX-6606 |
| Array t_smallint = connection.createArrayOf("SMALLINT", new Short[] { Short.MIN_VALUE, Short.MAX_VALUE }); |
| Array t_tinyint = connection.createArrayOf("TINYINT", new Byte[] { Byte.MIN_VALUE, Byte.MAX_VALUE }); |
| Array t_boolean = connection.createArrayOf("BOOLEAN", new Boolean[] { Boolean.TRUE, Boolean.FALSE }); |
| pstmt.setString(1, U_U_I_D); |
| pstmt.setArray(2, t_varchar); |
| pstmt.setArray(3, t_bigint); |
| pstmt.setArray(4, t_integer); |
| pstmt.setArray(5, t_double); |
| pstmt.setArray(6, t_smallint); |
| pstmt.setArray(7, t_tinyint); |
| pstmt.setArray(8, t_boolean); |
| pstmt.addBatch(); |
| pstmt.executeBatch(); |
| } |
| connection.commit(); |
| logger.info("Loaded {} rows.", allRows.size()); |
| } |
| } |
| |
| private static List<String[]> parseTblFile(String path) throws Exception { |
| CsvParserSettings settings = new CsvParserSettings(); |
| settings.getFormat().setDelimiter("|"); |
| settings.getFormat().setLineSeparator("\n"); |
| CsvParser parser = new CsvParser(settings); |
| |
| return parser.parseAll(getReader(path)); |
| } |
| |
| private static Reader getReader(String path) throws Exception { |
| return new InputStreamReader(new FileInputStream(path), "UTF-8"); |
| } |
| } |