/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.LOCALHOST;
import static org.junit.Assert.*;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;

import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.expression.function.UDFExpression;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.FunctionAlreadyExistsException;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.ValueRangeExcpetion;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.junit.rules.TestName;

@Category(NeedsOwnMiniClusterTest.class)
public class UserDefinedFunctionsIT extends BaseOwnClusterIT {
    protected static final String TENANT_ID = "ZZTop";
    private static String url;
    private static PhoenixTestDriver driver;
    private static HBaseTestingUtility util;

    private static String STRING_REVERSE_EVALUATE_METHOD =
            new StringBuffer()
                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
                    .append("        Expression arg = getChildren().get(0);\n")
                    .append("        if (!arg.evaluate(tuple, ptr)) {\n")
                    .append("           return false;\n")
                    .append("       }\n")
                    .append("       int targetOffset = ptr.getLength();\n")
                    .append("       if (targetOffset == 0) {\n")
                    .append("            return true;\n")
                    .append("        }\n")
                    .append("        byte[] source = ptr.get();\n")
                    .append("        byte[] target = new byte[targetOffset];\n")
                    .append("        int sourceOffset = ptr.getOffset(); \n")
                    .append("        int endOffset = sourceOffset + ptr.getLength();\n")
                    .append("        SortOrder sortOrder = arg.getSortOrder();\n")
                    .append("        while (sourceOffset < endOffset) {\n")
                    .append("            int nBytes = StringUtil.getBytesInChar(source[sourceOffset], sortOrder);\n")
                    .append("            targetOffset -= nBytes;\n")
                    .append("            System.arraycopy(source, sourceOffset, target, targetOffset, nBytes);\n")
                    .append("            sourceOffset += nBytes;\n")
                    .append("        }\n")
                    .append("        ptr.set(target);\n")
                    .append("        return true;\n")
                    .append("    }\n").toString();

    private static String SUM_COLUMN_VALUES_EVALUATE_METHOD =
            new StringBuffer()
                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
                    .append("        int[] array = new int[getChildren().size()];\n")
                    .append("        int i = 0;\n")
                    .append("        for(Expression child:getChildren()) {\n")
                    .append("            if (!child.evaluate(tuple, ptr)) {\n")
                    .append("                return false;\n")
                    .append("            }\n")
                    .append("            int targetOffset = ptr.getLength();\n")
                    .append("            if (targetOffset == 0) {\n")
                    .append("                return true;\n")
                    .append("            }\n")
                    .append("            array[i++] = (Integer) PInteger.INSTANCE.toObject(ptr);\n")
                    .append("        }\n")
                    .append("        int sum = 0;\n")
                    .append("        for(i=0;i<getChildren().size();i++) {\n")
                    .append("            sum+=array[i];\n")
                    .append("        }\n")
                    .append("        ptr.set(PInteger.INSTANCE.toBytes((Integer)sum));\n")
                    .append("        return true;\n")
                    .append("    }\n").toString();
    private static String ARRAY_INDEX_EVALUATE_METHOD =
            new StringBuffer()
                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
                    .append("        Expression indexExpr = children.get(1);\n")
                    .append("        if (!indexExpr.evaluate(tuple, ptr)) {\n")
                    .append("           return false;\n")
                    .append("        } else if (ptr.getLength() == 0) {\n")
                    .append("           return true;\n")
                    .append("        }\n")
                    .append("        // Use Codec to prevent Integer object allocation\n")
                    .append("        int index = PInteger.INSTANCE.getCodec().decodeInt(ptr, indexExpr.getSortOrder());\n")
                    .append("        if(index < 0) {\n")
                    .append("           throw new ParseException(\"Index cannot be negative :\" + index);\n")
                    .append("        }\n")
                    .append("        Expression arrayExpr = children.get(0);\n")
                    .append("        return PArrayDataTypeDecoder.positionAtArrayElement(tuple, ptr, index, arrayExpr, getDataType(),getMaxLength());\n")
                    .append("    }\n").toString();

    private static String GETY_EVALUATE_METHOD =
            new StringBuffer()
                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
                    .append("        Expression arg = getChildren().get(0);\n")
                    .append("        if (!arg.evaluate(tuple, ptr)) {\n")
                    .append("           return false;\n")
                    .append("        }\n")
                    .append("        int targetOffset = ptr.getLength();\n")
                    .append("        if (targetOffset == 0) {\n")
                    .append("           return true;\n")
                    .append("        }\n")
                    .append("        byte[] s = ptr.get();\n")
                    .append("        int retVal = (int)Bytes.toShort(s);\n")
                    .append("        ptr.set(PInteger.INSTANCE.toBytes(retVal));\n")
                    .append("        return true;\n")
                    .append("    }\n").toString();
    private static String GETX_EVALUATE_METHOD =
            new StringBuffer()
                    .append("    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n")
                    .append("        Expression arg = getChildren().get(0);\n")
                    .append("        if (!arg.evaluate(tuple, ptr)) {\n")
                    .append("           return false;\n")
                    .append("        }\n")
                    .append("        int targetOffset = ptr.getLength();\n")
                    .append("        if (targetOffset == 0) {\n")
                    .append("           return true;\n")
                    .append("        }\n")
                    .append("        byte[] s = ptr.get();\n")
                    .append("        Long retVal = Long.reverseBytes(Bytes.toLong(s));\n")
                    .append("        ptr.set(PLong.INSTANCE.toBytes(retVal));\n")
                    .append("        return true;\n")
                    .append("    }\n").toString();

    
    private static String MY_REVERSE_CLASS_NAME = "MyReverse";
    private static String MY_SUM_CLASS_NAME = "MySum";
    private static String MY_ARRAY_INDEX_CLASS_NAME = "MyArrayIndex";
    private static String GETX_CLASSNAME = "GetX";
    private static String GETY_CLASSNAME = "GetY";
    private static String MY_REVERSE_PROGRAM = getProgram(MY_REVERSE_CLASS_NAME, STRING_REVERSE_EVALUATE_METHOD, "return PVarchar.INSTANCE;");
    private static String MY_SUM_PROGRAM = getProgram(MY_SUM_CLASS_NAME, SUM_COLUMN_VALUES_EVALUATE_METHOD, "return PInteger.INSTANCE;");
    private static String MY_ARRAY_INDEX_PROGRAM = getProgram(MY_ARRAY_INDEX_CLASS_NAME, ARRAY_INDEX_EVALUATE_METHOD, "return PDataType.fromTypeId(children.get(0).getDataType().getSqlType()- PDataType.ARRAY_TYPE_BASE);");
    private static String GETX_CLASSNAME_PROGRAM = getProgram(GETX_CLASSNAME, GETX_EVALUATE_METHOD, "return PLong.INSTANCE;");
    private static String GETY_CLASSNAME_PROGRAM = getProgram(GETY_CLASSNAME, GETY_EVALUATE_METHOD, "return PInteger.INSTANCE;");
    private static Properties EMPTY_PROPS = new Properties();
    
    @Rule
    public TestName name = new TestName();

    @Override
    @After
    public void cleanUpAfterTest() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar1.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar2.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar3.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar4.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar5.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar6.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar7.jar'");
        stmt.execute("delete jar '"+ util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY)+"/"+"myjar8.jar'");
        conn.commit();
        conn.close();
    }

    @Before
    public void doSetupBeforeTest() throws Exception {
        compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1);
        compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2);
        compileTestClass(MY_ARRAY_INDEX_CLASS_NAME, MY_ARRAY_INDEX_PROGRAM, 3);
        compileTestClass(MY_ARRAY_INDEX_CLASS_NAME, MY_ARRAY_INDEX_PROGRAM, 4);
        compileTestClass(GETX_CLASSNAME, GETX_CLASSNAME_PROGRAM, 5);
        compileTestClass(GETY_CLASSNAME, GETY_CLASSNAME_PROGRAM, 6);
    }

    private static String getProgram(String className, String evaluateMethod, String returnType) {
        return new StringBuffer()
                .append("package org.apache.phoenix.end2end;\n")
                .append("import java.sql.SQLException;\n")
                .append("import java.util.List;\n")
                .append("import java.lang.Long;\n")
                .append("import java.lang.Integer;\n")
                .append("import org.apache.hadoop.hbase.io.ImmutableBytesWritable;\n")
                .append("import org.apache.hadoop.hbase.util.Bytes;\n")
                .append("import org.apache.phoenix.schema.types.PLong;")
                .append("import org.apache.phoenix.schema.types.PInteger;"
                		+ ""
                		+ "")
                .append("import org.apache.phoenix.expression.Expression;\n")
                .append("import org.apache.phoenix.expression.function.ScalarFunction;\n")
                .append("import org.apache.phoenix.schema.SortOrder;\n")
                .append("import org.apache.phoenix.schema.tuple.Tuple;\n")
                .append("import org.apache.phoenix.schema.types.PDataType;\n")
                .append("import org.apache.phoenix.schema.types.PInteger;\n")
                .append("import org.apache.phoenix.schema.types.PVarchar;\n")
                .append("import org.apache.phoenix.util.StringUtil;\n")
                .append("import org.apache.phoenix.schema.types.PArrayDataType;\n")
                .append("import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;\n")
                .append("import org.apache.phoenix.parse.ParseException;\n")
                .append("public class "+className+" extends ScalarFunction{\n")
                .append("    public static final String NAME = \""+className+"\";\n")
                .append("    public "+className+"() {\n")
                .append("    }\n")
                .append("    public "+className+"(List<Expression> children) throws SQLException {\n")
                .append("        super(children);\n")
                .append("    }\n")
                .append("    @Override\n")
                .append(evaluateMethod)
                .append("    @Override\n")
                .append("    public SortOrder getSortOrder() {\n")
                .append("        return getChildren().get(0).getSortOrder();\n")
                .append("    }\n")
                .append("  @Override\n")
                .append("   public PDataType getDataType() {\n")
                .append(returnType+"\n")
                .append("    }\n")
                .append("    @Override\n")
                .append("    public String getName() {\n")
                .append("        return NAME;\n")
                .append("    }\n")
                .append("}\n").toString();
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        setUpConfigForMiniCluster(conf);
        util = new HBaseTestingUtility(conf);
        util.startMiniDFSCluster(1);
        util.startMiniZKCluster(1);
        String string = util.getConfiguration().get("fs.defaultFS");
        // PHOENIX-4675 setting the trailing slash implicitly tests that we're doing some path normalization
        conf.set(DYNAMIC_JARS_DIR_KEY, string+"/hbase/tmpjars/");
        util.startMiniHBaseCluster(1, 1);
        UDFExpression.setConfig(conf);

        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
        url =
                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR
                        + clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
        props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
        props.put(QueryServices.DYNAMIC_JARS_DIR_KEY,string+"/hbase/tmpjars/");
        driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
    }
    
    @Test
    public void testListJars() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Path jarPath = new Path(util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY));
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery("list jars");
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar1.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar2.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar3.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar4.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar5.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar6.jar").toString(), rs.getString("jar_location"));
        assertFalse(rs.next());
    }

    @Test
    public void testDeleteJar() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery("list jars");
        assertTrue(rs.next());
        Path jarPath = new Path(util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY));
        assertEquals(new Path(jarPath, "myjar1.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar2.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar3.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar4.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar5.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar6.jar").toString(), rs.getString("jar_location"));
        assertFalse(rs.next());
        stmt.execute("delete jar '"+ new Path(jarPath, "myjar4.jar").toString() + "'");
        rs = stmt.executeQuery("list jars");
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar1.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar2.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar3.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar5.jar").toString(), rs.getString("jar_location"));
        assertTrue(rs.next());
        assertEquals(new Path(jarPath, "myjar6.jar").toString(), rs.getString("jar_location"));
        assertFalse(rs.next());
    }

    @Test
    public void testCreateFunction() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t(k integer primary key, firstname varchar, lastname varchar)");
        stmt.execute("upsert into t values(1,'foo','jock')");
        conn.commit();
        stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        ResultSet rs = stmt.executeQuery("select myreverse(firstname) from t");
        assertTrue(rs.next());
        assertEquals("oof", rs.getString(1));
        assertFalse(rs.next());
        rs = stmt.executeQuery("select * from t where myreverse(firstname)='oof'");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals("foo", rs.getString(2));        
        assertEquals("jock", rs.getString(3));
        assertFalse(rs.next());
        
        try {
            stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
            fail("Duplicate function should not be created.");
        } catch(FunctionAlreadyExistsException e) {
        }
        // without specifying the jar should pick the class from path of hbase.dynamic.jars.dir configuration. 
        stmt.execute("create function myreverse2(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
        rs = stmt.executeQuery("select myreverse2(firstname) from t");
        assertTrue(rs.next());
        assertEquals("oof", rs.getString(1));        
        assertFalse(rs.next());
        rs = stmt.executeQuery("select myreverse2('abc'), myreverse2('aba')");
        assertTrue(rs.next());
        assertEquals("cba", rs.getString(1));
        assertEquals("aba", rs.getString(2));
        assertFalse(rs.next());
        rs = stmt.executeQuery("select * from t where myreverse2(firstname)='oof'");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals("foo", rs.getString(2));        
        assertEquals("jock", rs.getString(3));
        assertFalse(rs.next());
        conn.createStatement().execute("create table t3(tenant_id varchar not null, k integer not null, firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true");
        // Function created with global id should be accessible.
        Connection conn2 = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+TENANT_ID, EMPTY_PROPS);
        try {
            conn2.createStatement().execute("upsert into t3 values(1,'foo','jock')");
            conn2.commit();
            conn2.createStatement().execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
            rs = conn2.createStatement().executeQuery("select myreverse(firstname) from t3");
            assertTrue(rs.next());
            assertEquals("oof", rs.getString(1)); 
        } catch(FunctionAlreadyExistsException e) {
            fail("FunctionAlreadyExistsException should not be thrown");
        }
        // calling global udf on tenant specific specific connection.
        rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
        assertTrue(rs.next());
        assertEquals("oof", rs.getString(1));
        try {
            conn2.createStatement().execute("drop function myreverse2");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e){
            
        }
        conn.createStatement().execute("drop function myreverse2");
        try {
            rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3");
            fail("FunctionNotFoundException should be thrown.");
        } catch(FunctionNotFoundException e){
            
        }
        try{
            rs = conn2.createStatement().executeQuery("select unknownFunction(firstname) from t3");
            fail("FunctionNotFoundException should be thrown.");
        } catch(FunctionNotFoundException e) {
            
        }
        conn.createStatement().execute("CREATE TABLE TESTTABLE10(ID VARCHAR NOT NULL, NAME VARCHAR ARRAY, CITY VARCHAR ARRAY CONSTRAINT pk PRIMARY KEY (ID) )");
        conn.createStatement().execute("create function UDF_ARRAY_ELEM(VARCHAR ARRAY, INTEGER) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_ARRAY_INDEX_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar3.jar"+"'");
        conn.createStatement().execute("UPSERT INTO TESTTABLE10(ID,NAME,CITY) VALUES('111', ARRAY['JOHN','MIKE','BOB'], ARRAY['NYC','LA','SF'])");
        conn.createStatement().execute("UPSERT INTO TESTTABLE10(ID,NAME,CITY) VALUES('112', ARRAY['CHEN','CARL','ALICE'], ARRAY['BOSTON','WASHINGTON','PALO ALTO'])");
        conn.commit();
        rs = conn.createStatement().executeQuery("SELECT ID, UDF_ARRAY_ELEM(NAME, 2) FROM TESTTABLE10");
        assertTrue(rs.next());
        assertEquals("111", rs.getString(1));
        assertEquals("MIKE", rs.getString(2));
        assertTrue(rs.next());
        assertEquals("112", rs.getString(1));
        assertEquals("CARL", rs.getString(2));
        assertFalse(rs.next());
        rs = conn2.createStatement().executeQuery("SELECT ID, UDF_ARRAY_ELEM(NAME, 2) FROM TESTTABLE10");
        assertTrue(rs.next());
        assertEquals("111", rs.getString(1));
        assertEquals("MIKE", rs.getString(2));
        assertTrue(rs.next());
        assertEquals("112", rs.getString(1));
        assertEquals("CARL", rs.getString(2));
        assertFalse(rs.next());
    }

    @Test
    public void testSameUDFWithDifferentImplementationsInDifferentTenantConnections() throws Exception {
        Connection nonTenantConn = driver.connect(url, EMPTY_PROPS);
        nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        try {
            nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
            fail("FunctionAlreadyExistsException should be thrown.");
        } catch(FunctionAlreadyExistsException e) {
            
        }
        String tenantId1="tenId1";
        String tenantId2="tenId2";
        nonTenantConn.createStatement().execute("create table t7(tenant_id varchar not null, k integer not null, k1 integer, name varchar constraint pk primary key(tenant_id, k)) multi_tenant=true");
        Connection tenant1Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId1, EMPTY_PROPS);
        Connection tenant2Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId2, EMPTY_PROPS);
        tenant1Conn.createStatement().execute("upsert into t7 values(1,1,'jock')");
        tenant1Conn.commit();
        tenant2Conn.createStatement().execute("upsert into t7 values(1,2,'jock')");
        tenant2Conn.commit();
        tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        try {
            tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
            fail("FunctionAlreadyExistsException should be thrown.");
        } catch(FunctionAlreadyExistsException e) {
            
        }

        tenant2Conn.createStatement().execute("create function myfunction(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        try {
            tenant2Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/unknown.jar"+"'");
            fail("FunctionAlreadyExistsException should be thrown.");
        } catch(FunctionAlreadyExistsException e) {
            
        }

        ResultSet rs = tenant1Conn.createStatement().executeQuery("select MYFUNCTION(name) from t7");
        assertTrue(rs.next());
        assertEquals("kcoj", rs.getString(1));
        assertFalse(rs.next());
        rs = tenant1Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(name)='kcoj'");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals(1, rs.getInt(2));        
        assertEquals("jock", rs.getString(3));
        assertFalse(rs.next());

        rs = tenant2Conn.createStatement().executeQuery("select MYFUNCTION(k) from t7");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        assertFalse(rs.next());
        rs = tenant2Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(k1)=12");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals(2, rs.getInt(2)); 
        assertEquals("jock", rs.getString(3));
        assertFalse(rs.next());
    }

    @Test
    public void testUDFsWithMultipleConnections() throws Exception {
        Connection conn1 = driver.connect(url, EMPTY_PROPS);
        conn1.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        Connection conn2 = driver.connect(url, EMPTY_PROPS);
        try{
            conn2.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                    + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
            fail("FunctionAlreadyExistsException should be thrown.");
        } catch(FunctionAlreadyExistsException e) {
            
        }
        conn2.createStatement().execute("create table t8(k integer not null primary key, k1 integer, name varchar)");
        conn2.createStatement().execute("upsert into t8 values(1,1,'jock')");
        conn2.commit();
        ResultSet rs = conn2.createStatement().executeQuery("select MYFUNCTION(name) from t8");
        assertTrue(rs.next());
        assertEquals("kcoj", rs.getString(1));
        assertFalse(rs.next());
        rs = conn2.createStatement().executeQuery("select * from t8 where MYFUNCTION(name)='kcoj'");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals(1, rs.getInt(2));
        assertEquals("jock", rs.getString(3));
        assertFalse(rs.next());
        conn2.createStatement().execute("drop function MYFUNCTION");
        try {
            rs = conn1.createStatement().executeQuery("select MYFUNCTION(name) from t8");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
            
        }
    }
    @Test
    public void testUsingUDFFunctionInDifferentQueries() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t1(k integer primary key, firstname varchar, lastname varchar)");
        stmt.execute("upsert into t1 values(1,'foo','jock')");
        conn.commit();
        conn.createStatement().execute("create table t2(k integer primary key, k1 integer, lastname_reverse varchar)");
        conn.commit();
        stmt.execute("create function mysum3(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        stmt.execute("create function myreverse3(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        stmt.execute("upsert into t2(k,k1,lastname_reverse) select mysum3(k),mysum3(k,11),myreverse3(lastname) from t1");
        conn.commit();
        ResultSet rs = stmt.executeQuery("select * from t2");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        assertEquals(12, rs.getInt(2));
        assertEquals("kcoj", rs.getString(3));
        assertFalse(rs.next());
        stmt.execute("delete from t2 where myreverse3(lastname_reverse)='jock' and mysum3(k)=21");
        conn.commit();
        rs = stmt.executeQuery("select * from t2");
        assertFalse(rs.next());
        stmt.execute("create function myreverse4(VARCHAR CONSTANT defaultValue='null') returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
        stmt.execute("upsert into t2 values(11,12,myreverse4('jock'))");
        conn.commit();
        rs = stmt.executeQuery("select * from t2");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        assertEquals(12, rs.getInt(2));
        assertEquals("kcoj", rs.getString(3));
        assertFalse(rs.next());
    }

    @Test
    public void testVerifyCreateFunctionArguments() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t4(k integer primary key, k1 integer, lastname varchar)");
        stmt.execute("upsert into t4 values(1,1,'jock')");
        conn.commit();
        stmt.execute("create function mysum(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        ResultSet rs = stmt.executeQuery("select mysum(k,12) from t4");
        assertTrue(rs.next());
        assertEquals(13, rs.getInt(1));
        rs = stmt.executeQuery("select mysum(k) from t4");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        try {
            stmt.executeQuery("select mysum(k,20) from t4");
            fail("Value Range Exception should be thrown.");
        } catch(ValueRangeExcpetion e) {
            
        }
    }

    @Test
    public void testTemporaryFunctions() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t9(k integer primary key, k1 integer, lastname varchar)");
        stmt.execute("upsert into t9 values(1,1,'jock')");
        conn.commit();
        stmt.execute("create temporary function mysum9(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        ResultSet rs = stmt.executeQuery("select mysum9(k,12) from t9");
        assertTrue(rs.next());
        assertEquals(13, rs.getInt(1));
        rs = stmt.executeQuery("select mysum9(k) from t9");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        try {
            rs = stmt.executeQuery("select k from t9 where mysum9(k,10,'x')=11");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
        } catch(Exception e) {
            fail("FunctionNotFoundException should be thrown");
        }
        try {
            rs = stmt.executeQuery("select mysum9() from t9");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
        } catch(Exception e) {
            fail("FunctionNotFoundException should be thrown");
        }
        stmt.execute("drop function mysum9");
        try {
            rs = stmt.executeQuery("select k from t9 where mysum9(k)=11");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e){
            
        }
    }

    @Test
    public void testDropFunction() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        String query = "select count(*) from "+ SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"";
        ResultSet rs = stmt.executeQuery(query);
        rs.next();
        int numRowsBefore = rs.getInt(1);
        stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        rs = stmt.executeQuery(query);
        rs.next();
        int numRowsAfter= rs.getInt(1);
        assertEquals(3, numRowsAfter - numRowsBefore);
        stmt.execute("drop function mysum6");
        rs = stmt.executeQuery(query);
        rs.next();
        assertEquals(numRowsBefore, rs.getInt(1));
        conn.createStatement().execute("create table t6(k integer primary key, k1 integer, lastname varchar)");
        try {
            rs = stmt.executeQuery("select mysum6(k1) from t6");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
            
        }
        try {
            stmt.execute("drop function mysum6");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
            
        }
        try {
            stmt.execute("drop function if exists mysum6");
        } catch(FunctionNotFoundException e) {
            fail("FunctionNotFoundException should not be thrown");
        }
        stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        try {
            rs = stmt.executeQuery("select mysum6(k1) from t6");
        } catch(FunctionNotFoundException e) {
            fail("FunctionNotFoundException should not be thrown");
        }
    }

    @Test
    public void testUDFsWithLatestTimestamp() throws Exception {
        Properties props = new Properties();
        Connection conn = DriverManager.getConnection(url, props);
        Statement stmt = conn.createStatement();
        String query = "select count(*) from "+ SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"";
        ResultSet rs = stmt.executeQuery(query);
        rs.next();
        int numRowsBefore = rs.getInt(1);
        stmt.execute("create function mysum61(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        rs = stmt.executeQuery(query);
        rs.next();
        int numRowsAfter= rs.getInt(1);
        assertEquals(3, numRowsAfter - numRowsBefore);
        stmt.execute("drop function mysum61");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        rs = stmt.executeQuery(query);
        rs.next();
        assertEquals(numRowsBefore, rs.getInt(1));
        conn.createStatement().execute("create table t62(k integer primary key, k1 integer, lastname varchar)");
        try {
            rs = stmt.executeQuery("select mysum61(k1) from t62");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
            
        }
        try {
            stmt.execute("drop function mysum61");
            fail("FunctionNotFoundException should be thrown");
        } catch(FunctionNotFoundException e) {
            
        }
        try {
            stmt.execute("drop function if exists mysum61");
        } catch(FunctionNotFoundException e) {
            fail("FunctionNotFoundException should not be thrown");
        }
        stmt.execute("create function mysum61(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        try {
            rs = stmt.executeQuery("select mysum61(k1) from t62");
        } catch(FunctionNotFoundException e) {
            fail("FunctionNotFoundException should not be thrown");
        }
        conn.createStatement().execute("create table t61(k integer primary key, k1 integer, lastname varchar)");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        stmt.execute("upsert into t61 values(1,1,'jock')");
        conn.commit();
        stmt.execute("create function myfunction6(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        stmt.execute("create or replace function myfunction6(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        rs = stmt.executeQuery("select myfunction6(k,12) from t61");
        assertTrue(rs.next());
        assertEquals(13, rs.getInt(1));
        rs = stmt.executeQuery("select myfunction6(k) from t61");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        rs = stmt.executeQuery("select k from t61 where myfunction6(k)=11");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        stmt.execute("create or replace function myfunction6(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        rs = stmt.executeQuery("select k from t61 where myfunction6(lastname)='kcoj'");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        props.setProperty(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "false");
        conn = DriverManager.getConnection(url, props);
        stmt = conn.createStatement();
        try {
            rs = stmt.executeQuery("select k from t61 where reverse(lastname,11)='kcoj'");
            fail("FunctionNotFoundException should be thrown.");
        } catch(FunctionNotFoundException e) {
            
        }

    }

    @Test
    public void testFunctionalIndexesWithUDFFunction() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        stmt.execute("create table t5(k integer primary key, k1 integer, lastname_reverse varchar)");
        stmt.execute("create function myreverse5(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'");
        stmt.execute("upsert into t5 values(1,1,'jock')");
        conn.commit();
        stmt.execute("create index idx on t5(myreverse5(lastname_reverse))");
        String query = "select myreverse5(lastname_reverse) from t5";

        ExplainPlan plan = conn.prepareStatement(query)
            .unwrap(PhoenixPreparedStatement.class).optimizeQuery()
            .getExplainPlan();
        ExplainPlanAttributes explainPlanAttributes =
            plan.getPlanStepsAsAttributes();
        assertEquals("PARALLEL 1-WAY",
            explainPlanAttributes.getIteratorTypeAndScanSize());
        assertEquals("FULL SCAN ",
            explainPlanAttributes.getExplainScanType());
        assertEquals("IDX", explainPlanAttributes.getTableName());
        assertEquals("SERVER FILTER BY FIRST KEY ONLY",
            explainPlanAttributes.getServerWhereFilter());

        ResultSet rs = stmt.executeQuery(query);
        assertTrue(rs.next());
        assertEquals("kcoj", rs.getString(1));
        assertFalse(rs.next());
        stmt.execute("create local index idx2 on t5(myreverse5(lastname_reverse))");
        query = "select k,k1,myreverse5(lastname_reverse) from t5 where myreverse5(lastname_reverse)='kcoj'";

        plan = conn.prepareStatement(query)
            .unwrap(PhoenixPreparedStatement.class).optimizeQuery()
            .getExplainPlan();
        explainPlanAttributes =
            plan.getPlanStepsAsAttributes();
        assertEquals("PARALLEL 1-WAY",
            explainPlanAttributes.getIteratorTypeAndScanSize());
        assertEquals("RANGE SCAN ",
            explainPlanAttributes.getExplainScanType());
        assertEquals("IDX2(T5)", explainPlanAttributes.getTableName());
        assertEquals(" [1,'kcoj']", explainPlanAttributes.getKeyRanges());
        assertEquals("SERVER FILTER BY FIRST KEY ONLY",
            explainPlanAttributes.getServerWhereFilter());
        assertEquals("CLIENT MERGE SORT",
            explainPlanAttributes.getClientSortAlgo());

        rs = stmt.executeQuery(query);
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        assertEquals(1, rs.getInt(2));
        assertEquals("kcoj", rs.getString(3));
        assertFalse(rs.next());
    }

    private static void initJoinTableValues(Connection conn) throws Exception {
        conn.createStatement().execute("create table " + JOIN_ITEM_TABLE_FULL_NAME +
                "   (\"item_id\" varchar(10) not null primary key, " +
                "    name varchar, " +
                "    price integer, " +
                "    discount1 integer, " +
                "    discount2 integer, " +
                "    \"supplier_id\" varchar(10), " +
                "    description varchar)");
        conn.createStatement().execute("create table " + JOIN_SUPPLIER_TABLE_FULL_NAME +
                "   (\"supplier_id\" varchar(10) not null primary key, " +
                "    name varchar, " +
                "    phone varchar(12), " +
                "    address varchar, " +
                "    loc_id varchar(5))");
        PreparedStatement stmt;
        conn.createStatement().execute("CREATE SEQUENCE my.seq");
        
        // Insert into item table
        stmt = conn.prepareStatement(
                "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
                "   (\"item_id\", " +
                "    NAME, " +
                "    PRICE, " +
                "    DISCOUNT1, " +
                "    DISCOUNT2, " +
                "    \"supplier_id\", " +
                "    DESCRIPTION) " +
                "values (?, ?, ?, ?, ?, ?, ?)");
        stmt.setString(1, "0000000001");
        stmt.setString(2, "T1");
        stmt.setInt(3, 100);
        stmt.setInt(4, 5);
        stmt.setInt(5, 10);
        stmt.setString(6, "0000000001");
        stmt.setString(7, "Item T1");
        stmt.execute();

        stmt.setString(1, "0000000002");
        stmt.setString(2, "T2");
        stmt.setInt(3, 200);
        stmt.setInt(4, 5);
        stmt.setInt(5, 8);
        stmt.setString(6, "0000000001");
        stmt.setString(7, "Item T2");
        stmt.execute();

        stmt.setString(1, "0000000003");
        stmt.setString(2, "T3");
        stmt.setInt(3, 300);
        stmt.setInt(4, 8);
        stmt.setInt(5, 12);
        stmt.setString(6, "0000000002");
        stmt.setString(7, "Item T3");
        stmt.execute();

        stmt.setString(1, "0000000004");
        stmt.setString(2, "T4");
        stmt.setInt(3, 400);
        stmt.setInt(4, 6);
        stmt.setInt(5, 10);
        stmt.setString(6, "0000000002");
        stmt.setString(7, "Item T4");
        stmt.execute();

        stmt.setString(1, "0000000005");
        stmt.setString(2, "T5");
        stmt.setInt(3, 500);
        stmt.setInt(4, 8);
        stmt.setInt(5, 15);
        stmt.setString(6, "0000000005");
        stmt.setString(7, "Item T5");
        stmt.execute();

        stmt.setString(1, "0000000006");
        stmt.setString(2, "T6");
        stmt.setInt(3, 600);
        stmt.setInt(4, 8);
        stmt.setInt(5, 15);
        stmt.setString(6, "0000000006");
        stmt.setString(7, "Item T6");
        stmt.execute();
        
        stmt.setString(1, "invalid001");
        stmt.setString(2, "INVALID-1");
        stmt.setInt(3, 0);
        stmt.setInt(4, 0);
        stmt.setInt(5, 0);
        stmt.setString(6, "0000000000");
        stmt.setString(7, "Invalid item for join test");
        stmt.execute();

        // Insert into supplier table
        stmt = conn.prepareStatement(
                "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
                "   (\"supplier_id\", " +
                "    NAME, " +
                "    PHONE, " +
                "    ADDRESS, " +
                "    LOC_ID) " +
                "values (?, ?, ?, ?, ?)");
        stmt.setString(1, "0000000001");
        stmt.setString(2, "S1");
        stmt.setString(3, "888-888-1111");
        stmt.setString(4, "101 YYY Street");
        stmt.setString(5, "10001");
        stmt.execute();
            
        stmt.setString(1, "0000000002");
        stmt.setString(2, "S2");
        stmt.setString(3, "888-888-2222");
        stmt.setString(4, "202 YYY Street");
        stmt.setString(5, "10002");
        stmt.execute();

        stmt.setString(1, "0000000003");
        stmt.setString(2, "S3");
        stmt.setString(3, "888-888-3333");
        stmt.setString(4, "303 YYY Street");
        stmt.setString(5, null);
        stmt.execute();

        stmt.setString(1, "0000000004");
        stmt.setString(2, "S4");
        stmt.setString(3, "888-888-4444");
        stmt.setString(4, "404 YYY Street");
        stmt.setString(5, null);
        stmt.execute();

        stmt.setString(1, "0000000005");
        stmt.setString(2, "S5");
        stmt.setString(3, "888-888-5555");
        stmt.setString(4, "505 YYY Street");
        stmt.setString(5, "10005");
        stmt.execute();

        stmt.setString(1, "0000000006");
        stmt.setString(2, "S6");
        stmt.setString(3, "888-888-6666");
        stmt.setString(4, "606 YYY Street");
        stmt.setString(5, "10006");
        stmt.execute();

        conn.commit();
    }
    
    @Test
    public void testUdfWithJoin() throws Exception {
        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", myreverse8(supp.name) FROM "
                + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME
                + " item ON myreverse8(item.\"supplier_id\") = myreverse8(supp.\"supplier_id\") ORDER BY \"item_id\"";
        Connection conn = driver.connect(url, EMPTY_PROPS);
        initJoinTableValues(conn);
        conn.createStatement().execute(
                "create function myreverse8(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.MyReverse' using jar "
                        + "'" + util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar" + "'");
        try {
            PreparedStatement statement = conn.prepareStatement(query);
            ResultSet rs = statement.executeQuery();
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000001");
            assertEquals(rs.getString(2), "T1");
            assertEquals(rs.getString(3), "0000000001");
            assertEquals(rs.getString(4), "1S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000002");
            assertEquals(rs.getString(2), "T2");
            assertEquals(rs.getString(3), "0000000001");
            assertEquals(rs.getString(4), "1S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000003");
            assertEquals(rs.getString(2), "T3");
            assertEquals(rs.getString(3), "0000000002");
            assertEquals(rs.getString(4), "2S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000004");
            assertEquals(rs.getString(2), "T4");
            assertEquals(rs.getString(3), "0000000002");
            assertEquals(rs.getString(4), "2S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000005");
            assertEquals(rs.getString(2), "T5");
            assertEquals(rs.getString(3), "0000000005");
            assertEquals(rs.getString(4), "5S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "0000000006");
            assertEquals(rs.getString(2), "T6");
            assertEquals(rs.getString(3), "0000000006");
            assertEquals(rs.getString(4), "6S");
            assertTrue(rs.next());
            assertEquals(rs.getString(1), "invalid001");
            assertEquals(rs.getString(2), "INVALID-1");
            assertNull(rs.getString(3));
            assertNull(rs.getString(4));

            assertFalse(rs.next());
        } finally {
            conn.close();
        }
    }

    @Test
    public void testReplaceFunction() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t10(k integer primary key, k1 integer, lastname varchar)");
        stmt.execute("upsert into t10 values(1,1,'jock')");
        conn.commit();
        stmt.execute("create function myfunction63(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'");
        stmt.execute("create or replace function myfunction63(INTEGER, INTEGER CONSTANT defaultValue=10 minvalue=1 maxvalue=15 ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'");
        ResultSet rs = stmt.executeQuery("select myfunction63(k,12) from t10");
        assertTrue(rs.next());
        assertEquals(13, rs.getInt(1));
        rs = stmt.executeQuery("select myfunction63(k) from t10");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        rs = stmt.executeQuery("select k from t10 where myfunction63(k)=11");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
        Connection conn2 = driver.connect(url, EMPTY_PROPS);
        stmt = conn2.createStatement();
        rs = stmt.executeQuery("select myfunction63(k,12) from t10");
        assertTrue(rs.next());
        assertEquals(13, rs.getInt(1));
        rs = stmt.executeQuery("select myfunction63(k) from t10");
        assertTrue(rs.next());
        assertEquals(11, rs.getInt(1));
        rs = stmt.executeQuery("select k from t10 where myfunction63(k)=11");
        assertTrue(rs.next());
        assertEquals(1, rs.getInt(1));
    }

    @Test
    public void testUDFsWithSameChildrenInAQuery() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        Statement stmt = conn.createStatement();
        conn.createStatement().execute("create table t11(k varbinary primary key, k1 integer, lastname varchar)");
        String query = "UPSERT INTO t11"
                + "(k, k1, lastname) "
                + "VALUES(?,?,?)";
        PreparedStatement pStmt = conn.prepareStatement(query);
        pStmt.setBytes(1, new byte[] {0,0,0,0,0,0,0,1});
        pStmt.setInt(2, 1);
        pStmt.setString(3, "jock");
        pStmt.execute();
        conn.commit();
        stmt.execute("create function udf1(VARBINARY) returns UNSIGNED_LONG as 'org.apache.phoenix.end2end."+GETX_CLASSNAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar5.jar"+"'");
        stmt.execute("create function udf2(VARBINARY) returns INTEGER as 'org.apache.phoenix.end2end."+GETY_CLASSNAME+"' using jar "
                + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar6.jar"+"'");
        ResultSet rs = stmt.executeQuery("select udf1(k), udf2(k) from t11");
        assertTrue(rs.next());
        assertEquals(72057594037927936l, rs.getLong(1));
        assertEquals(0, rs.getInt(2));
        rs = stmt.executeQuery("select udf2(k), udf1(k) from t11");
        assertTrue(rs.next());
        assertEquals(0, rs.getInt(1));
        assertEquals(72057594037927936l, rs.getLong(2));
        rs = stmt.executeQuery("select udf1(k), udf1(k) from t11");
        assertTrue(rs.next());
        assertEquals(72057594037927936l, rs.getLong(1));
        assertEquals(72057594037927936l, rs.getLong(2));
    }

    /**
     * Compiles the test class with bogus code into a .class file.
     * Upon finish, the bogus jar will be left at dynamic.jar.dir location
     */
    private static void compileTestClass(String className, String program, int counter) throws Exception {
        String javaFileName = className+".java";
        File javaFile = new File(javaFileName);
        String classFileName = className+".class";
        File classFile = new File(classFileName);
        String jarName = "myjar"+counter+".jar";
        String jarPath = "." + File.separator + jarName;
        File jarFile = new File(jarPath);
        try {
            String packageName = "org.apache.phoenix.end2end";
            FileOutputStream fos = new FileOutputStream(javaFileName);
            fos.write(program.getBytes());
            fos.close();
            
            JavaCompiler jc = ToolProvider.getSystemJavaCompiler();
            int result = jc.run(null, null, null, javaFileName);
            assertEquals(0, result);
            
            Manifest manifest = new Manifest();
            manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
            FileOutputStream jarFos = new FileOutputStream(jarPath);
            JarOutputStream jarOutputStream = new JarOutputStream(jarFos, manifest);
            String pathToAdd = packageName.replace('.', '/') + '/';
            String jarPathStr = new String(pathToAdd);
            Set<String> pathsInJar = new HashSet<String>();

            while (pathsInJar.add(jarPathStr)) {
                int ix = jarPathStr.lastIndexOf('/', jarPathStr.length() - 2);
                if (ix < 0) {
                    break;
                }
                jarPathStr = jarPathStr.substring(0, ix);
            }
            for (String pathInJar : pathsInJar) {
                jarOutputStream.putNextEntry(new JarEntry(pathInJar));
                jarOutputStream.closeEntry();
            }

            jarOutputStream.putNextEntry(new JarEntry(pathToAdd + classFile.getName()));
            byte[] allBytes = new byte[(int) classFile.length()];
            FileInputStream fis = new FileInputStream(classFile);
            fis.read(allBytes);
            fis.close();
            jarOutputStream.write(allBytes);
            jarOutputStream.closeEntry();
            jarOutputStream.close();
            jarFos.close();
            
            assertTrue(jarFile.exists());
            Connection conn = driver.connect(url, EMPTY_PROPS);
            Statement stmt = conn.createStatement();
            stmt.execute("add jars '"+jarFile.getAbsolutePath()+"'");
        } finally {
            if (javaFile != null) javaFile.delete();
            if (classFile != null) classFile.delete();
            if (jarFile != null) jarFile.delete();
        }
    }

    /**
     * Test creating functions using hbase.dynamic.jars.dir
     * @throws Exception
     */
    @Test
    public void testCreateFunctionDynamicJarDir() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        String tableName = "table" + name.getMethodName();

        conn.createStatement().execute("create table " + tableName + "(tenant_id varchar not null, k integer not null, "
                + "firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true");
        String tenantId = "tenId" + name.getMethodName();
        Connection tenantConn = driver.connect(url + ";" + PhoenixRuntime.TENANT_ID_ATTRIB + "=" + tenantId, EMPTY_PROPS);
        Statement stmtTenant = tenantConn.createStatement();
        stmtTenant.execute("upsert into " + tableName + " values(1,'foo','jock')");
        tenantConn.commit();

        compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 7);

        String sql="create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end." + MY_REVERSE_CLASS_NAME
                + "' using jar '" + util.getConfiguration().get(QueryServices.DYNAMIC_JARS_DIR_KEY).toString() + "'";
        stmtTenant.execute(sql);
        ResultSet rs = stmtTenant.executeQuery("select myfunction(firstname) from " + tableName);
        assertTrue(rs.next());
        assertEquals("oof",rs.getString(1));
    }


    /**
     * Test creating functions using dir otherthan hbase.dynamic.jars.dir
     * @throws Exception
     */
    @Test
    public void testCreateFunctionNonDynamicJarDir() throws Exception {
        Connection conn = driver.connect(url, EMPTY_PROPS);
        String tableName = "table" + name.getMethodName();

        conn.createStatement().execute("create table " + tableName + "(tenant_id varchar not null, k integer not null, "
                + "firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true");
        String tenantId = "tenId" + name.getMethodName();
        Connection tenantConn = driver.connect(url + ";" + PhoenixRuntime.TENANT_ID_ATTRIB + "=" + tenantId, EMPTY_PROPS);
        Statement stmtTenant = tenantConn.createStatement();
        tenantConn.commit();

        compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 8);
        Path destJarPathOnHDFS = copyJarsFromDynamicJarsDirToDummyHDFSDir("myjar8.jar");

        try {
            String sql =
                    "create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."
                            + MY_REVERSE_CLASS_NAME + "' using jar '" + destJarPathOnHDFS.toString()
                            + "'";
            stmtTenant.execute(sql);
            ResultSet rs = stmtTenant.executeQuery("select myfunction(firstname) from " + tableName);
            fail("expecting java.lang.SecurityException");
        }catch(Exception e){
            assertTrue(ExceptionUtils.getRootCause(e) instanceof SecurityException);
        }finally {
            stmtTenant.execute("drop function myfunction");
        }
    }
    /**
    * Move the jars from the hbase.dynamic.jars.dir to data test directory
    * @param jarName
    * @return The destination jar file path.
    * @throws IOException
    */
    private Path copyJarsFromDynamicJarsDirToDummyHDFSDir(String jarName) throws IOException {
        Path srcPath = new Path(util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/" + jarName);
        FileSystem srcFs = srcPath.getFileSystem(util.getConfiguration());
        Path destPath = new Path(util.getDataTestDirOnTestFS().toString() + "/" + jarName);
        FileSystem destFs = destPath.getFileSystem(util.getConfiguration());
        FileUtil.copy(srcFs, srcPath, destFs, destPath, false, true, util.getConfiguration());
        return destPath;
    }
}
