blob: 25458d6be8b4698bb5eea091286bc1e31fee1198 [file] [log] [blame]
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you maynot use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicablelaw or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pig;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.TestUtil.LOCALHOST;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Preconditions;
/**
*
* Test class to run all the integration tests against a virtual map reduce cluster.
*/
public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT {
private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoaderIT.class);
private static final String SCHEMA_NAME = "T";
private static final String TABLE_NAME = "A";
private static final String INDEX_NAME = "I";
private static final String TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME);
private static final String CASE_SENSITIVE_TABLE_NAME = SchemaUtil.getEscapedArgument("a");
private static final String CASE_SENSITIVE_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME,CASE_SENSITIVE_TABLE_NAME);
private String zkQuorum;
private Connection conn;
private PigServer pigServer;
@Before
public void setUp() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig());
pigServer = new PigServer(ExecType.LOCAL, getTestClusterConfig());
}
/**
* Validates the schema returned for a table with Pig data types.
* @throws Exception
*/
@Test
public void testSchemaForTable() throws Exception {
final String TABLE = "TABLE1";
final String ddl = String.format("CREATE TABLE %s "
+ " (a_string varchar not null, a_binary varbinary not null, a_integer integer, cf1.a_float float"
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n", TABLE);
conn.createStatement().execute(ddl);
conn.commit();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
final Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(4, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("a_binary"));
assertTrue(fields.get(1).type == DataType.BYTEARRAY);
assertTrue(fields.get(2).alias.equalsIgnoreCase("a_integer"));
assertTrue(fields.get(2).type == DataType.INTEGER);
assertTrue(fields.get(3).alias.equalsIgnoreCase("a_float"));
assertTrue(fields.get(3).type == DataType.FLOAT);
}
/**
* Validates the schema returned when specific columns of a table are given as part of LOAD .
* @throws Exception
*/
@Test
public void testSchemaForTableWithSpecificColumns() throws Exception {
//create the table
final String TABLE = "TABLE2";
final String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
final String selectColumns = "ID,NAME";
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
TABLE, selectColumns, zkQuorum));
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(2, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("ID"));
assertTrue(fields.get(0).type == DataType.INTEGER);
assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME"));
assertTrue(fields.get(1).type == DataType.CHARARRAY);
}
/**
* Validates the schema returned when a SQL SELECT query is given as part of LOAD .
* @throws Exception
*/
@Test
public void testSchemaForQuery() throws Exception {
//create the table.
final String TABLE = "TABLE3";
String ddl = String.format("CREATE TABLE " + TABLE +
" (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+ " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", TABLE);
conn.createStatement().execute(ddl);
//sql query for LOAD
final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
sqlQuery, zkQuorum));
//assert the schema.
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(3, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("a_integer"));
assertTrue(fields.get(1).type == DataType.INTEGER);
assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double"));
assertTrue(fields.get(2).type == DataType.DOUBLE);
}
/**
* Validates the schema when it is given as part of LOAD..AS
* @throws Exception
*/
@Test
public void testSchemaForTableWithAlias() throws Exception {
//create the table.
final String TABLE = "S.TABLE4";
String ddl = "CREATE TABLE " + TABLE
+ " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+ " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL)) \n";
conn.createStatement().execute(ddl);
//select query given as part of LOAD.
final String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
pigServer.registerQuery(String.format(
"raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
sqlQuery, zkQuorum));
//test the schema.
Schema schema = pigServer.dumpSchema("raw");
List<FieldSchema> fields = schema.getFields();
assertEquals(4, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("a"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("b"));
assertTrue(fields.get(1).type == DataType.BIGDECIMAL);
assertTrue(fields.get(2).alias.equalsIgnoreCase("c"));
assertTrue(fields.get(2).type == DataType.INTEGER);
assertTrue(fields.get(3).alias.equalsIgnoreCase("d"));
assertTrue(fields.get(3).type == DataType.DOUBLE);
}
/**
* @throws Exception
*/
@Test
public void testDataForTable() throws Exception {
//create the table
String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//load data and filter rows whose age is > 25
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME,
zkQuorum));
pigServer.registerQuery("B = FILTER A BY AGE > 25;");
final Iterator<Tuple> iterator = pigServer.openIterator("B");
int recordsRead = 0;
while (iterator.hasNext()) {
final Tuple each = iterator.next();
assertEquals(3, each.size());
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
/**
* @throws Exception
*/
@Test
public void testDataForSQLQuery() throws Exception {
//create the table
String ddl = "CREATE TABLE " + TABLE_FULL_NAME
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//sql query
final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
//load data and filter rows whose age is > 25
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
iterator.next();
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
/**
*
* @throws Exception
*/
@Test
public void testForNonPKSQLQuery() throws Exception {
//create the table
final String TABLE = "TABLE5";
String ddl = "CREATE TABLE " + TABLE
+ " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ UNSIGNED_INT)";
conn.createStatement().execute(ddl);
//upsert data.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) ";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setString(1, "a");
stmt.setString(2, "a");
stmt.setInt(3,-1);
stmt.setInt(4,1);
stmt.execute();
stmt.setString(1, "b");
stmt.setString(2, "b");
stmt.setInt(3,-2);
stmt.setInt(4,2);
stmt.execute();
conn.commit();
//sql query
final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE);
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
final Tuple tuple = iterator.next();
assertEquals("a", tuple.get(0));
assertEquals(1, tuple.get(1));
recordsRead++;
}
assertEquals(1, recordsRead);
//test the schema. Test for PHOENIX-1123
Schema schema = pigServer.dumpSchema("A");
List<FieldSchema> fields = schema.getFields();
assertEquals(2, fields.size());
assertTrue(fields.get(0).alias.equalsIgnoreCase("FOO"));
assertTrue(fields.get(0).type == DataType.CHARARRAY);
assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ"));
assertTrue(fields.get(1).type == DataType.INTEGER);
}
/**
* @throws Exception
*/
@Test
public void testGroupingOfDataForTable() throws Exception {
//create the table
final String TABLE = "TABLE6";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
conn.createStatement().execute(ddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
//prepare the mock storage with expected output
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(0,180));
expectedList.add(Storage.tuple(0,270));
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);");
pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList, actualList);
}
/**
* Tests both {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage}
* @throws Exception
*/
@Test
public void testLoadAndStore() throws Exception {
//create the tables
final String TABLE = "TABLE7";
final String sourceTableddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
final String targetTable = "AGGREGATE";
final String targetTableddl = "CREATE TABLE " + targetTable
+ "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) ";
conn.createStatement().execute(sourceTableddl);
conn.createStatement().execute(targetTableddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);");
pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable
+ "' using " + PhoenixHBaseStorage.class.getName() + "('"
+ zkQuorum + "', '-batchSize 1000');");
pigServer.executeBatch();
//validate the data with what is stored.
final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE";
final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
assertTrue(rs.next());
assertEquals(25, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(180, rs.getInt("MAX_SAL"));
assertTrue(rs.next());
assertEquals(30, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(270, rs.getInt("MAX_SAL"));
}
/**
* Test for Sequence
* @throws Exception
*/
@Test
public void testDataForSQLQueryWithSequences() throws Exception {
//create the table
final String TABLE = "TABLE8";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
conn.createStatement().execute(ddl);
String sequenceDdl = "CREATE SEQUENCE my_sequence";
conn.createStatement().execute(sequenceDdl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
stmt.execute();
}
conn.commit();
//sql query load data and filter rows whose age is > 25
final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25";
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
Iterator<Tuple> iterator = pigServer.openIterator("A");
int recordsRead = 0;
while (iterator.hasNext()) {
iterator.next();
recordsRead++;
}
assertEquals(rows/2, recordsRead);
}
@Test
public void testDataForSQLQueryWithFunctions() throws Exception {
//create the table
final String TABLE = "TABLE9";
String ddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) ";
conn.createStatement().execute(ddl);
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.execute();
}
conn.commit();
//sql query
final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " ORDER BY ID" ;
pigServer.registerQuery(String.format(
"A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
Iterator<Tuple> iterator = pigServer.openIterator("A");
int i = 0;
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
String name = (String)tuple.get(0);
assertEquals("A" + i, name);
i++;
}
}
@Test
public void testDataFromIndexTable() throws Exception {
try {
//create the table
String ddl = "CREATE TABLE " + TABLE_NAME
+ " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
conn.createStatement().execute(ddl);
//create a index table
String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) ";
conn.createStatement().execute(indexDdl);
//upsert the data.
final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
stmt.setInt(3, i * 5);
stmt.execute();
}
conn.commit();
pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;");
Iterator<Tuple> iterator = pigServer.openIterator("A");
while (iterator.hasNext()) {
Tuple tuple = iterator.next();
assertEquals("a5", tuple.get(0));
assertEquals(25, tuple.get(1));
}
} finally {
dropTable(TABLE_NAME);
dropTable(INDEX_NAME);
}
}
@Test
public void testLoadOfSaltTable() throws Exception {
final String TABLE = "TABLE11";
final String sourceTableddl = "CREATE TABLE " + TABLE
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2 ";
conn.createStatement().execute(sourceTableddl);
//prepare data with 10 rows having age 25 and the other 30.
final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
int rows = 20;
int j = 0, k = 0;
for(int i = 0 ; i < rows; i++) {
stmt.setInt(1, i);
stmt.setString(2, "a"+i);
if(i % 2 == 0) {
stmt.setInt(3, 25);
stmt.setInt(4, 10 * 2 * j++);
} else {
stmt.setInt(3, 30);
stmt.setInt(4, 10 * 3 * k++);
}
stmt.execute();
}
conn.commit();
final Data data = Storage.resetData(pigServer);
List<Tuple> expectedList = new ArrayList<Tuple>();
expectedList.add(Storage.tuple(25,10));
expectedList.add(Storage.tuple(30,10));
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
"A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);");
pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
pigServer.executeBatch();
List<Tuple> actualList = data.get("out");
assertEquals(expectedList.size(), actualList.size());
}
@After
public void tearDown() throws Exception {
if(conn != null) {
conn.close();
}
pigServer.shutdown();
}
private void dropTable(String tableFullName) throws SQLException {
Preconditions.checkNotNull(conn);
conn.createStatement().execute(String.format("DROP TABLE IF EXISTS %s",tableFullName));
}
}