blob: febae90368cb55c1edb1fa48ff429aa4b302cc25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you maynot use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicablelaw or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.junit.Test;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
/**
*
* Test class to run all the integration tests against a virtual map reduce cluster.
*/
public class PhoenixHBaseLoaderIT extends BasePigIT {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixHBaseLoaderIT.class);
private static final String SCHEMA_NAME = "T";
private static final String TABLE_NAME = "A";
private static final String INDEX_NAME = "I";
private static final String TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME);
private static final String CASE_SENSITIVE_TABLE_NAME = SchemaUtil.getEscapedArgument("a");
private static final String CASE_SENSITIVE_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME,CASE_SENSITIVE_TABLE_NAME);
/**
* Validates the schema returned for a table with Pig data types.
* @throws Exception
*/
@Test
public void testSchemaForTable() throws Exception {
final String TABLE = "TABLE1";
final String ddl = String.format("CREATE TABLE %s "
+ " (a_string varchar not null, a_binary varbinary not null, a_integer integer, cf1.a_float float"
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n", TABLE);
conn.createStatement().execute(ddl);
conn.commit();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
final Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(4, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("a_binary"));
assertTrue(fields.get(1).type == DataType.BYTEARRAY);
assertTrue(fields.get(2).alias.equalsIgnoreCase("a_integer"));
assertTrue(fields.get(2).type == DataType.INTEGER);
assertTrue(fields.get(3).alias.equalsIgnoreCase("a_float"));
assertTrue(fields.get(3).type == DataType.FLOAT);
}
/**
* Validates the schema returned when specific columns of a table are given as part of LOAD .
* @throws Exception
*/
@Test
public void testSchemaForTableWithSpecificColumns() throws Exception {
//create the table
final String TABLE = "TABLE2";
final String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
final String selectColumns = "ID,NAME";
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
TABLE, selectColumns, zkQuorum));
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(2, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("ID"));
assertTrue(fields.get(0).type == DataType.INTEGER);
assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME"));
assertTrue(fields.get(1).type == DataType.CHARARRAY);
}
/**
* Validates the schema returned when a SQL SELECT query is given as part of LOAD .
* @throws Exception
*/
@Test
public void testSchemaForQuery() throws Exception {
//create the table.
final String TABLE = "TABLE3";
String ddl = String.format("CREATE TABLE " + TABLE +
" (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+ " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", TABLE);
conn.createStatement().execute(ddl);
//sql query for LOAD
final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
sqlQuery, zkQuorum));
//assert the schema.
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(3, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("a_integer"));
assertTrue(fields.get(1).type == DataType.INTEGER);
assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double"));
assertTrue(fields.get(2).type == DataType.DOUBLE);
}
/**
* Validates the schema when it is given as part of LOAD..AS
* @throws Exception
*/
@Test
public void testSchemaForTableWithAlias() throws Exception {
//create the table.
final String TABLE = "S.TABLE4";
String ddl = "CREATE TABLE " + TABLE
+ " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+ " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL)) \n";
conn.createStatement().execute(ddl);
//select query given as part of LOAD.
final String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
pigServer.registerQuery(String.format(
"raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
sqlQuery, zkQuorum));
//test the schema.
Schema schema = pigServer.dumpSchema("raw");
List<FieldSchema> fields = schema.getFields();
assertEquals(4, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("b"));
assertTrue(fields.get(1).type == DataType.BIGDECIMAL);
assertTrue(fields.get(2).alias.equalsIgnoreCase("c"));
assertTrue(fields.get(2).type == DataType.INTEGER);
assertTrue(fields.get(3).alias.equalsIgnoreCase("d"));
assertTrue(fields.get(3).type == DataType.DOUBLE);
}
/**
* @throws Exception
*/
@Test
public void testDataForTable() throws Exception {
//create the table
String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//load data and filter rows whose age is > 25
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME,
zkQuorum));
pigServer.registerQuery("B = FILTER A BY AGE > 25;");
final Iterator<Tuple> iterator = pigServer.openIterator("B");
int recordsRead = 0;
while (iterator.hasNext()) {
final Tuple each = iterator.next();
assertEquals(3, each.size());
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
/**
* @throws Exception
*/
@Test
public void testDataForSQLQuery() throws Exception {
//create the table
String ddl = "CREATE TABLE " + TABLE_FULL_NAME
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//sql query
final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
//load data and filter rows whose age is > 25
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
iterator.next();
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
/**
*
* @throws Exception
*/
@Test
public void testForNonPKSQLQuery() throws Exception {
//create the table
final String TABLE = "TABLE5";
String ddl = "CREATE TABLE " + TABLE
+ " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ UNSIGNED_INT)";
conn.createStatement().execute(ddl);
//upsert data.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) ";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setString(1, "a");
stmt.setString(2, "a");
stmt.setInt(3,-1);
stmt.setInt(4,1);
stmt.execute();
stmt.setString(1, "b");
stmt.setString(2, "b");
stmt.setInt(3,-2);
stmt.setInt(4,2);
stmt.execute();
conn.commit();
//sql query
final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE);
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
final Tuple tuple = iterator.next();
assertEquals("a", tuple.get(0));
assertEquals(1, tuple.get(1));
recordsRead++;
}
assertEquals(1, recordsRead);
//test the schema. Test for PHOENIX-1123
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(2, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("FOO"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ"));
assertTrue(fields.get(1).type == DataType.INTEGER);
}
/**
* @throws Exception
*/
@Test
public void testGroupingOfDataForTable() throws Exception {
//create the table
final String TABLE = "TABLE6";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
//prepare the mock storage with expected output
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(0,180));
expectedList.add(Storage.tuple(0,270));
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);");
pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList, actualList);
}
@Test
public void testTimestampForSQLQuery() throws Exception {
//create the table
String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
conn.createStatement().execute(ddl);
final String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
conn.createStatement().execute(dml);
conn.commit();
//sql query
final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T ";
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
assertEquals("foo", tuple.get(0));
assertEquals(2006, tuple.get(1));
}
}
@Test
public void testDateForSQLQuery() throws Exception {
//create the table
String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
conn.createStatement().execute(ddl);
final String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
conn.createStatement().execute(dml);
conn.commit();
//sql query
final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T ";
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
assertEquals("foo", tuple.get(0));
assertEquals(10, tuple.get(1));
}
}
@Test
public void testTimeForSQLQuery() throws Exception {
//create the table
String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
conn.createStatement().execute(ddl);
final String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
conn.createStatement().execute(dml);
conn.commit();
//sql query
final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T ";
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
assertEquals("foo", tuple.get(0));
assertEquals(30, tuple.get(1));
}
}
/**
* Tests both {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage}
* @throws Exception
*/
@Test
public void testLoadAndStore() throws Exception {
//create the tables
final String TABLE = "TABLE7";
final String sourceTableddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
final String targetTable = "AGGREGATE";
final String targetTableddl = "CREATE TABLE " + targetTable
+ "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) ";
conn.createStatement().execute(sourceTableddl);
conn.createStatement().execute(targetTableddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);");
pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable
+ "' using " + PhoenixHBaseStorage.class.getName() + "('"
+ zkQuorum + "', '-batchSize 1000');");
pigServer.executeBatch();
//validate the data with what is stored.
final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE";
final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
assertTrue(rs.next());
assertEquals(25, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(180, rs.getInt("MAX_SAL"));
assertTrue(rs.next());
assertEquals(30, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(270, rs.getInt("MAX_SAL"));
}
/**
* Test for Sequence
* @throws Exception
*/
@Test
public void testDataForSQLQueryWithSequences() throws Exception {
//create the table
final String TABLE = "TABLE8";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
String sequenceDdl = "CREATE SEQUENCE my_sequence";
conn.createStatement().execute(sequenceDdl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//sql query load data and filter rows whose age is > 25
final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25";
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
iterator.next();
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
@Test
public void testDataForSQLQueryWithFunctions() throws Exception {
//create the table
final String TABLE = "TABLE9";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) ";
conn.createStatement().execute(ddl);
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.execute();
}
conn.commit();
//sql query
final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " ORDER BY ID" ;
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
Iterator<Tuple> iterator = pigServer.openIterator("A");
int i = 0;
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
String name = (String)tuple.get(0);
assertEquals("A" + i, name);
i++;
}
}
@Test
public void testDataFromIndexTable() throws Exception {
//create the table
String ddl = "CREATE TABLE " + TABLE_NAME
+ " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
conn.createStatement().execute(ddl);
//create a index table
String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) ";
conn.createStatement().execute(indexDdl);
//upsert the data.
final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, i * 5);
stmt.execute();
}
conn.commit();
pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;");
Iterator<Tuple> iterator = pigServer.openIterator("A");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
assertEquals("a5", tuple.get(0));
assertEquals(25, tuple.get(1));
}
}
@Test
public void testLoadOfSaltTable() throws Exception {
final String TABLE = "TABLE11";
final String sourceTableddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2 ";
conn.createStatement().execute(sourceTableddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(25,10));
expectedList.add(Storage.tuple(30,10));
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);");
pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList.size(), actualList.size());
}
/**
*
* @throws Exception
*/
@Test
public void testLoadForArrayWithQuery() throws Exception {
//create the table
final String TABLE = "TABLE14";
String ddl = "CREATE TABLE " + TABLE
+ " ( ID INTEGER PRIMARY KEY, a_double_array double array[] , a_varchar_array varchar array, a_concat_str varchar, sep varchar)";
conn.createStatement().execute(ddl);
Double[] doubleArr = new Double[3];
doubleArr[0] = 2.2;
doubleArr[1] = 4.4;
doubleArr[2] = 6.6;
Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr);
Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d);
Double[] doubleArr2 = new Double[2];
doubleArr2[0] = 12.2;
doubleArr2[1] = 22.2;
Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2);
Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d);
String[] strArr = new String[4];
strArr[0] = "ABC";
strArr[1] = "DEF";
strArr[2] = "GHI";
strArr[3] = "JKL";
Array strArray = conn.createArrayOf("VARCHAR", strArr);
Tuple strArrTuple = Storage.tuple("ABC", "DEF", "GHI", "JKL");
String[] strArr2 = new String[2];
strArr2[0] = "ABC";
strArr2[1] = "XYZ";
Array strArray2 = conn.createArrayOf("VARCHAR", strArr2);
Tuple strArrTuple2 = Storage.tuple("ABC", "XYZ");
//upsert data.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?, ?, ?, ?) ";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setInt(1, 1);
stmt.setArray(2, doubleArray);
stmt.setArray(3, strArray);
stmt.setString(4, "ONE,TWO,THREE");
stmt.setString(5, ",");
stmt.execute();
stmt.setInt(1, 2);
stmt.setArray(2, doubleArray2);
stmt.setArray(3, strArray2);
stmt.setString(4, "FOUR:five:six");
stmt.setString(5, ":");
stmt.execute();
conn.commit();
Tuple dynArrTuple = Storage.tuple("ONE", "TWO", "THREE");
Tuple dynArrTuple2 = Storage.tuple("FOUR", "five", "six");
//sql query
final String sqlQuery = String.format(" SELECT ID, A_DOUBLE_ARRAY, A_VARCHAR_ARRAY, REGEXP_SPLIT(a_concat_str, sep) AS flattend_str FROM %s ", TABLE);
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(1, 3L, 4L, dynArrTuple));
expectedList.add(Storage.tuple(2, 2L, 2L, dynArrTuple2));
final String load = String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",sqlQuery,zkQuorum);
pigServer.setBatchOn();
pigServer.registerQuery(load);
pigServer.registerQuery("B = FOREACH A GENERATE ID, SIZE(A_DOUBLE_ARRAY), SIZE(A_VARCHAR_ARRAY), FLATTEND_STR;");
pigServer.registerQuery("STORE B INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList.size(), actualList.size());
assertEquals(expectedList, actualList);
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(4, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("ID"));
assertTrue(fields.get(0).type == DataType.INTEGER);
assertTrue(fields.get(1).alias.equalsIgnoreCase("A_DOUBLE_ARRAY"));
assertTrue(fields.get(1).type == DataType.TUPLE);
assertTrue(fields.get(2).alias.equalsIgnoreCase("A_VARCHAR_ARRAY"));
assertTrue(fields.get(2).type == DataType.TUPLE);
assertTrue(fields.get(3).alias.equalsIgnoreCase("FLATTEND_STR"));
assertTrue(fields.get(3).type == DataType.TUPLE);
Iterator<Tuple> iterator = pigServer.openIterator("A");
Tuple firstTuple = Storage.tuple(1, doubleArrTuple, strArrTuple, dynArrTuple);
Tuple secondTuple = Storage.tuple(2, doubleArrTuple2, strArrTuple2, dynArrTuple2);
List<Tuple> expectedRows = Lists.newArrayList(firstTuple, secondTuple);
List<Tuple> actualRows = Lists.newArrayList();
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
actualRows.add(tuple);
}
assertEquals(expectedRows, actualRows);
}
/**
*
* @throws Exception
*/
@Test
public void testLoadForArrayWithTable() throws Exception {
//create the table
final String TABLE = "TABLE15";
String ddl = "CREATE TABLE " + TABLE
+ " ( ID INTEGER PRIMARY KEY, a_double_array double array[])";
conn.createStatement().execute(ddl);
Double[] doubleArr = new Double[3];
doubleArr[0] = 2.2;
doubleArr[1] = 4.4;
doubleArr[2] = 6.6;
Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr);
Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d);
Double[] doubleArr2 = new Double[2];
doubleArr2[0] = 12.2;
doubleArr2[1] = 22.2;
Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2);
Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d);
//upsert data.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?) ";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setInt(1, 1);
stmt.setArray(2, doubleArray);
stmt.execute();
stmt.setInt(1, 2);
stmt.setArray(2, doubleArray2);
stmt.execute();
conn.commit();
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(1, doubleArrTuple));
expectedList.add(Storage.tuple(2, doubleArrTuple2));
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("STORE A INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList.size(), actualList.size());
assertEquals(expectedList, actualList);
}
}