PHOENIX-628 Support native JSON data type
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixJsonIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixJsonIT.java
new file mode 100644
index 0000000..dfb0b40
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixJsonIT.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
+import org.apache.phoenix.schema.json.PhoenixJson;
+import org.apache.phoenix.schema.types.PJson;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+/**
+ * End to end test for JSON data type for {@link PJson} and {@link PhoenixJson}.
+ */
+public class PhoenixJsonIT extends BaseHBaseManagedTimeIT {
+
+    @Test
+    public void testJsonUpsertForJsonHavingChineseAndControlAndQuoteChars() throws Exception {
+        String json = "{\"k1\":\"\\n \\\"jumps \\r'普派'\",\"k2\":true, \"k3\":2}";
+        String selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueOne'";
+        String pk = "valueOne";
+        Connection conn = getConnection();
+        try {
+
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testJsonUpsertValue() throws Exception {
+        String json = "{\"k1\":\"val\",\"k2\":true, \"k3\":2}";
+        String selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueOne'";
+        String pk = "valueOne";
+        Connection conn = getConnection();
+        try {
+
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testJsonArrayUpsertValue() throws Exception {
+        Connection conn = getConnection();
+        try {
+            String ddl =
+                    "CREATE TABLE testJson" + "  (pk VARCHAR NOT NULL PRIMARY KEY, " + "col1 json)";
+            createTestTable(getUrl(), ddl);
+
+            HashMap<String, String> jsonDataMap = new HashMap<String, String>();
+
+            jsonDataMap.put("justIntegerArray", "[1,2,3]");
+            jsonDataMap.put("justBooleanArray", "[true,false]");
+            jsonDataMap.put("justStringArray", "[\"One\",\"Two\"]");
+            jsonDataMap.put("mixedArray", "[\"One\",2, true, null]");
+            jsonDataMap.put("arrayInsideAKey", "{\"k1\":{\"k2\":[1,2,3]}}");
+
+            Set<Entry<String, String>> entrySet = jsonDataMap.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                createTableAndUpsertRecord(entry.getValue(), entry.getKey(), conn);
+
+                String selectQuery =
+                        "SELECT col1 FROM testJson WHERE pk = '" + entry.getKey() + "'";
+                PreparedStatement stmt = conn.prepareStatement(selectQuery);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(
+                    "Json array data read from DB is not as expected for " + entry.getKey(),
+                    entry.getValue(), rs.getString(1));
+                assertFalse(rs.next());
+            }
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testInvalidJsonUpsertValue() throws Exception {
+        String json = "{\"k1\"}";
+        Connection conn = getConnection();
+        try {
+            String ddl =
+                    "CREATE TABLE testJson" + "  (pk VARCHAR NOT NULL PRIMARY KEY, " + "col1 json)";
+            createTestTable(getUrl(), ddl);
+
+            String query = "UPSERT INTO testJson(pk, col1) VALUES(?,?)";
+            PreparedStatement stmt = conn.prepareStatement(query);
+            stmt.setString(1, "valueOne");
+            stmt.setString(2, json);
+            try {
+                stmt.execute();
+            } catch (SQLException sqe) {
+                assertEquals("SQL error code is not as expected when Json is invalid.",
+                    SQLExceptionCode.INVALID_JSON_DATA.getErrorCode(), sqe.getErrorCode());
+                assertEquals("SQL state is not expected when Json is invalid.", "22000",
+                    sqe.getSQLState());
+            }
+            conn.commit();
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testInvalidJsonStringCastAsJson() throws Exception {
+        String json = "{\"k1\":\"val\",\"k2\":true, \"k3\":2}";
+        String pk = "valueOne";
+        String selectQuery = "SELECT cast(pk as json) FROM testJson WHERE pk = 'valueOne'";
+        Connection conn = getConnection();
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(PhoenixJson.class.getName(), rs.getMetaData().getColumnClassName(1));
+            try {
+                rs.getString(1);
+                fail("casting invalid json string to json should fail.");
+            } catch (SQLException sqe) {
+                assertEquals(SQLExceptionCode.INVALID_JSON_DATA.getErrorCode(), sqe.getErrorCode());
+            }
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    private Connection getConnection() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        return conn;
+    }
+
+    @Test
+    public void testValidJsonStringCastAsJson() throws Exception {
+        Connection conn = getConnection();
+        String json = "{\"k1\":\"val\",\"k2\":true, \"k3\":2}";
+        try {
+            createTableAndUpsertRecord(json, json, conn);
+
+            String selectQuery = "SELECT cast(pk as json) FROM testJson";
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(PhoenixJson.class.getName(), rs.getMetaData().getColumnClassName(1));
+            String stringCastToJson = null;
+            try {
+                stringCastToJson = rs.getString(1);
+            } catch (SQLException sqe) {
+                fail("casting valid json string to json should not fail.");
+            }
+
+            assertEquals(json, stringCastToJson);
+            rs.close();
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testJsonCastAsString() throws Exception {
+        Connection conn = getConnection();
+        String json = "{\"k1\":\"val\",\"k2\":true, \"k3\":2}";
+        String pk = "valueOne";
+        String selectQuery = "SELECT cast(col1 as varchar) FROM testJson WHERE pk = 'valueOne'";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertNotEquals(PhoenixJson.class.getName(), rs.getMetaData().getColumnClassName(1));
+            assertEquals(String.class.getName(), rs.getMetaData().getColumnClassName(1));
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testJsonAsNull() throws Exception {
+        Connection conn = getConnection();
+        String json = null;
+        String pk = "valueOne";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            /*test is null*/
+            String selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueOne' and col1 is NULL";
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(PhoenixJson.class.getName(), rs.getMetaData().getColumnClassName(1));
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", PhoenixJson.getInstance(json), rs.getObject(1, PhoenixJson.class));
+            assertFalse(rs.next());
+            
+            /*test is not null*/
+            json = "[1,2,3]";
+            pk = "valueTwo";
+            upsertRecord(json, pk, conn);
+            selectQuery = "SELECT col1 FROM testJson WHERE col1 is not NULL";
+            stmt = conn.prepareStatement(selectQuery);
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(PhoenixJson.class.getName(), rs.getMetaData().getColumnClassName(1));
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+            
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testCountDistinct() throws Exception {
+        final int countDistinct = 11;
+        Connection conn = getConnection();
+        String json = null;
+        String pk = "valueOne";
+        String selectQuery = "SELECT DISTINCT_COUNT(col1)  FROM testJson";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+            for (int i = 0; i < countDistinct; i++) {
+                upsertRecord("[" + i + "]", String.valueOf(i), conn);
+            }
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            stmt.executeQuery();
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+           
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testDistinct() throws Exception {
+        final int countDistinct = 11;
+        Connection conn = getConnection();
+        String json = null;
+        String pk = "valueOne";
+        String selectQuery = "SELECT DISTINCT(col1)  FROM testJson";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+            for (int i = 0; i < countDistinct; i++) {
+                upsertRecord("[" + i + "]", String.valueOf(i), conn);
+            }
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            stmt.executeQuery();
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+           
+        } finally {
+            conn.close();
+        }
+    }
+
+    
+    @Test
+    public void testJsonColumnInWhereClause() throws Exception {
+        Connection conn = getConnection();
+        String json = "[1]";
+        String pk = "valueOne";
+        String selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueOne' and col1 = '[1]'";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            stmt.executeQuery();
+            fail("'=' operator should not be allowed with ");
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+           
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJsonGroupByColumn() throws Exception {
+        Connection conn = getConnection();
+        String json = "[1]";
+        String pk = "valueOne";
+        String selectQuery = "SELECT col1 FROM testJson group by col1";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            stmt.executeQuery();
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+           
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJsonColumnInWhereClauseOfSubQuery() throws Exception {
+        Connection conn = getConnection();
+        String json = "[1]";
+        String pk = "valueOne";
+        String selectQuery =
+                "SELECT col1 FROM testJson WHERE col1 in (select col1 from testJson where col1='[1]')";
+        try {
+            createTableAndUpsertRecord(json, pk, conn);
+
+            PreparedStatement stmt = conn.prepareStatement(selectQuery);
+            stmt.executeQuery();
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    
+    @Test
+    public void testSetObject() throws SQLException {
+
+        Connection conn = getConnection();
+        try {
+            String json = "{\"k1\":\"val\",\"k2\":true, \"k3\":2}";
+            String pk = "valueOne";
+            String ddl =
+                    "CREATE TABLE testJson" + "  (pk VARCHAR NOT NULL PRIMARY KEY, " + "col1 json)";
+            createTestTable(getUrl(), ddl);
+
+            String query = "UPSERT INTO testJson(pk, col1) VALUES(?,?)";
+            PreparedStatement stmt = conn.prepareStatement(query);
+            stmt.setString(1, pk);
+            stmt.setObject(2, PhoenixJson.getInstance(json), java.sql.Types.OTHER);
+            stmt.execute();
+
+            pk = "valueTwo";
+            query = "UPSERT INTO testJson(pk, col1) VALUES(?,?)";
+            stmt = conn.prepareStatement(query);
+            stmt.setString(1, pk);
+            stmt.setObject(2, json, java.sql.Types.OTHER);
+            stmt.execute();
+
+            conn.commit();
+
+            String selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueOne'";
+            stmt = conn.prepareStatement(selectQuery);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+
+            selectQuery = "SELECT col1 FROM testJson WHERE pk = 'valueTwo'";
+            stmt = conn.prepareStatement(selectQuery);
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("Json data read from DB is not as expected for query: <" + selectQuery
+                    + ">", json, rs.getString(1));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    private void createTableAndUpsertRecord(String json, String pk, Connection conn) throws SQLException {
+        String ddl =
+                "CREATE TABLE testJson" + " (pk VARCHAR NOT NULL PRIMARY KEY, " + "col1 json)";
+        createTestTable(getUrl(), ddl);
+
+        upsertRecord(json, pk, conn);
+    }
+
+    private void upsertRecord(String json, String pk, Connection conn) throws SQLException {
+        String query = "UPSERT INTO testJson(pk, col1) VALUES(?,?)";
+        PreparedStatement stmt = conn.prepareStatement(query);
+        stmt.setString(1, pk);
+        stmt.setString(2, json);
+        stmt.execute();
+        conn.commit();
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 39baf7a..e2bd19d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -199,6 +199,16 @@
         ParseNode rhsNode = node.getChildren().get(1);
         Expression lhsExpr = children.get(0);
         Expression rhsExpr = children.get(1);
+        PDataType dataTypeOfLHSExpr = lhsExpr.getDataType();
+        if (dataTypeOfLHSExpr != null && !dataTypeOfLHSExpr.isEqualitySupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                    .setMessage(" for type " + dataTypeOfLHSExpr).build().buildException();
+        }
+        PDataType dataTypeOfRHSExpr = rhsExpr.getDataType();
+        if (dataTypeOfRHSExpr != null && !dataTypeOfRHSExpr.isEqualitySupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                    .setMessage(" for type " + dataTypeOfRHSExpr).build().buildException();
+        }
         CompareOp op = node.getFilterOp();
 
         if (lhsNode instanceof RowValueConstructorParseNode && rhsNode instanceof RowValueConstructorParseNode) {
@@ -479,6 +489,16 @@
                 !rhs.getDataType().isCoercibleTo(lhs.getDataType())) {
             throw TypeMismatchException.newException(lhs.getDataType(), rhs.getDataType(), node.toString());
         }
+        if (!lhs.getDataType().isEqualitySupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                    .setMessage(" for type " + lhs.getDataType()).build().buildException();
+        }
+        if (!rhs.getDataType().isEqualitySupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                    .setMessage(" for type " + rhs.getDataType()).build().buildException();
+        }
+        
+        
         if (lhsNode instanceof BindParseNode) {
             context.getBindManager().addParamMetaData((BindParseNode)lhsNode, rhs);
         }
@@ -617,6 +637,12 @@
         Expression firstChild = inChildren.get(0);
         ImmutableBytesWritable ptr = context.getTempPtr();
         PDataType firstChildType = firstChild.getDataType();
+        
+        if (firstChildType != null && !firstChildType.isEqualitySupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                    .setMessage(" for type " + firstChildType).build().buildException();
+        }
+        
         ParseNode firstChildNode = node.getChildren().get(0);
         
         if (firstChildNode instanceof BindParseNode) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index cf72384..6747661 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -86,6 +86,7 @@
     SUBQUERY_RETURNS_DIFFERENT_NUMBER_OF_FIELDS(216, "22016", "Sub-query must return the same number of fields as the left-hand-side expression of 'IN'."),
     AMBIGUOUS_JOIN_CONDITION(217, "22017", "Amibiguous or non-equi join condition specified. Consider using table list with where clause."),
     CONSTRAINT_VIOLATION(218, "22018", "Constraint violatioin."),
+    INVALID_JSON_DATA(219, "22000", "Invalid json data."),
     
     /**
      * Constraint Violation (errorcode 03, sqlstate 23)
@@ -147,7 +148,7 @@
     ORDER_BY_ARRAY_NOT_SUPPORTED(515, "42893", "ORDER BY of an array type is not allowed"),
     NON_EQUALITY_ARRAY_COMPARISON(516, "42894", "Array types may only be compared using = or !="),
     INVALID_NOT_NULL_CONSTRAINT(517, "42895", "Invalid not null constraint on non primary key column"),
-
+    NON_EQUALITY_COMPARISON(523, "42900", "Could not identify an equality operator"),
     /**
      *  Invalid Transaction State (errorcode 05, sqlstate 25)
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 51f4089..e815817 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -247,7 +247,9 @@
     LogFunction(LogFunction.class),
     ExpFunction(ExpFunction.class),
     PowerFunction(PowerFunction.class),
-    ArrayConcatFunction(ArrayConcatFunction.class)
+    ArrayConcatFunction(ArrayConcatFunction.class),
+    JsonExtractPathFunction(JsonExtractPathFunction.class),
+    JsonExtractPathTextFunction(JsonExtractPathTextFunction.class)
     ;
 
     ExpressionType(Class<? extends Expression> clazz) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
index 6ce3c27..f014c95 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
@@ -21,13 +21,13 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.DistinctCountClientAggregator;
 import org.apache.phoenix.expression.aggregator.DistinctValueWithCountServerAggregator;
 import org.apache.phoenix.parse.FunctionParseNode.Argument;
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -100,6 +100,11 @@
     
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        for (Expression child : getChildren()) {
+            if (!child.getDataType().isEqualitySupported()) {
+                throw new EqualityNotSupportedException(child.getDataType());
+            }
+        }
         // TODO: optimize query plan of this to run scan serially for a limit of one row
         if (!super.evaluate(tuple, ptr)) {
             ptr.set(ZERO); // If evaluate returns false, then no rows were found, so result is 0
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
index cde3e9c..6402496 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
@@ -134,6 +134,13 @@
          if(this.isArray) {
              this.dataType = localType;
          }
+         if (this.dataType != null && !this.dataType.canBePrimaryKey() && isPK) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_PRIMARY_KEY_CONSTRAINT)
+                        .setColumnName(columnDefName.getColumnName())
+                        .setMessage(
+                            "," + this.dataType.toString() + " is not supported as primary key,")
+                        .build().buildException();
+         }
          this.expressionStr = expressionStr;
      } catch (SQLException e) {
          throw new ParseException(e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/EqualityNotSupportedException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/EqualityNotSupportedException.java
new file mode 100644
index 0000000..a2a61c3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/EqualityNotSupportedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.schema.types.PDataType;
+
+public class EqualityNotSupportedException extends RuntimeException {
+    public EqualityNotSupportedException(PDataType<?> pDataType) {
+        super(new SQLExceptionInfo.Builder(SQLExceptionCode.NON_EQUALITY_COMPARISON)
+                .setMessage(" for type " + pDataType.toString()).build().buildException());
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/json/PhoenixJson.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/json/PhoenixJson.java
new file mode 100644
index 0000000..bbd35fa
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/json/PhoenixJson.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.json;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
+import org.apache.phoenix.schema.types.PJson;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonParser.Feature;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ValueNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The {@link PhoenixJson} wraps json and uses Jackson library to parse and traverse the json. It
+ * should be used to represent the JSON data type and also should be used to parse Json data and
+ * read the value from it. It always conside the last value if same key exist more than once.
+ */
+public class PhoenixJson implements Comparable<PhoenixJson> {
+    private final JsonNode rootNode;
+    /*
+     * input data has been stored as it is, since some data is lost when json parser runs, for
+     * example if a JSON object within the value contains the same key more than once then only last
+     * one is stored rest all of them are ignored, which will defy the contract of PJsonDataType of
+     * keeping user data as it is.
+     */
+    private final String jsonAsString;
+
+    /**
+     * Static Factory method to get an {@link PhoenixJson} object. It also validates the json and
+     * throws {@link SQLException} if it is invalid with line number and character.
+     * @param jsonData Json data as {@link String}.
+     * @return {@link PhoenixJson}.
+     * @throws SQLException
+     */
+    public static PhoenixJson getInstance(String jsonData) throws SQLException {
+        if (jsonData == null) {
+           return null;
+        }
+        try {
+            JsonFactory jsonFactory = new JsonFactory();
+            JsonParser jsonParser = jsonFactory.createJsonParser(jsonData);
+            JsonNode jsonNode = getRootJsonNode(jsonParser);
+            return new PhoenixJson(jsonNode, jsonData);
+        } catch (IOException x) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_JSON_DATA).setRootCause(x)
+                    .setMessage(x.getMessage()).build().buildException();
+        }
+
+    }
+
+    /**
+     * Returns the root of the resulting {@link JsonNode} tree.
+     */
+    private static JsonNode getRootJsonNode(JsonParser jsonParser) throws IOException,
+            JsonProcessingException {
+        jsonParser.configure(Feature.ALLOW_COMMENTS, true);
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            return objectMapper.readTree(jsonParser);
+        } finally {
+            jsonParser.close();
+        }
+    }
+
+    /* Default for unit testing */PhoenixJson(final JsonNode node, final String jsonData) {
+        Preconditions.checkNotNull(node, "root node cannot be null for json");
+        this.rootNode = node;
+        this.jsonAsString = jsonData;
+    }
+
+    /**
+     * Get {@link PhoenixJson} for a given json paths. For example :
+     * <p>
+     * <code>
+     * {"f2":{"f3":1},"f4":{"f5":99,"f6":{"f7":"2"}}}'
+     * </code>
+     * <p>
+     * for this source json, if we want to know the json at path {'f4','f6'} it will return
+     * {@link PhoenixJson} object for json {"f7":"2"}. It always returns the last key if same key
+     * exist more than once.
+     * <p>
+     * If the given path is unreachable then it throws {@link SQLException}.
+     * @param paths {@link String []} of path in the same order as they appear in json.
+     * @return {@link PhoenixJson} for the json against @paths.
+     * @throws SQLException
+     */
+    public PhoenixJson getPhoenixJson(String[] paths) throws SQLException {
+        try {
+            PhoenixJson phoenixJson = getPhoenixJsonInternal(paths);
+            if (phoenixJson == null) {
+                throw new SQLException("path: " + Arrays.asList(paths) + " not found.");
+            }
+            return phoenixJson;
+        } catch (NumberFormatException nfe) {
+            throw new SQLException("path: " + Arrays.asList(paths) + " not found.", nfe);
+        }
+    }
+
+    /**
+     * Get {@link PhoenixJson} for a given json paths. For example :
+     * <p>
+     * <code>
+     * {"f2":{"f3":1},"f4":{"f5":99,"f6":{"f7":"2"}}}'
+     * </code>
+     * <p>
+     * for this source json, if we want to know the json at path {'f4','f6'} it will return
+     * {@link PhoenixJson} object for json {"f7":"2"}. It always returns the last key if same key
+     * exist more than once.
+     * <p>
+     * If the given path is unreachable then it return null.
+     * @param paths {@link String []} of path in the same order as they appear in json.
+     * @return {@link PhoenixJson} for the json against @paths.
+     */
+    public PhoenixJson getPhoenixJsonOrNull(String[] paths) {
+        try {
+            return getPhoenixJsonInternal(paths);
+        } catch (NumberFormatException nfe) {
+            // ignore
+        }
+        return null;
+    }
+
+    /**
+     * Serialize the current {@link PhoenixJson} to String. Its required for
+     * json_extract_path_text(). If we just return node.toString() it will wrap String value in
+     * double quote which is not the expectation, hence avoiding calling toString() on
+     * {@link JsonNode} until PhoenixJson represent a Json Array or container for Json object. If
+     * PhoenixJson just represent a {@link ValueNode} then it should return value returned from
+     * objects toString().
+     */
+    public String serializeToString() {
+        if (this.rootNode == null || this.rootNode.isNull()) {
+            return null;
+        } else if (this.rootNode.isValueNode()) {
+
+            if (this.rootNode.isNumber()) {
+                return this.rootNode.getNumberValue().toString();
+            } else if (this.rootNode.isBoolean()) {
+                return String.valueOf(this.rootNode.getBooleanValue());
+            } else if (this.rootNode.isTextual()) {
+                return this.rootNode.getTextValue();
+            } else {
+                return this.jsonAsString;
+            }
+        } else if (this.rootNode.isArray()) {
+            return this.jsonAsString;
+        } else if (this.rootNode.isContainerNode()) {
+            return this.jsonAsString;
+        }
+
+        return null;
+
+    }
+
+    @Override
+    public String toString() {
+        return this.jsonAsString;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + this.jsonAsString.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        throw new EqualityNotSupportedException(PJson.INSTANCE);
+    }
+
+    /**
+     * @return length of the string represented by the current {@link PhoenixJson}.
+     */
+    public int estimateByteSize() {
+        String jsonStr = toString();
+        return jsonStr == null ? 1 : jsonStr.length();
+    }
+
+    public byte[] toBytes() {
+        return Bytes.toBytes(this.jsonAsString);
+    }
+
+    @Override
+    public int compareTo(PhoenixJson o) {
+        throw new EqualityNotSupportedException(PJson.INSTANCE);
+    }
+
+    private PhoenixJson getPhoenixJsonInternal(String[] paths) {
+        JsonNode node = this.rootNode;
+        for (String path : paths) {
+            JsonNode nodeTemp = null;
+            if (node.isArray()) {
+                int index = Integer.parseInt(path);
+                nodeTemp = node.path(index);
+            } else {
+                nodeTemp = node.path(path);
+            }
+            if (nodeTemp == null || nodeTemp.isMissingNode()) {
+                return null;
+            }
+            node = nodeTemp;
+        }
+        return new PhoenixJson(node, node.toString());
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataType.java
index 60d2020..b24406e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataType.java
@@ -307,6 +307,21 @@
     return getClass() == o.getClass();
   }
 
+    /**
+     * @return true if {@link PDataType} can be declared as primary key otherwise false.
+     */
+    public boolean canBePrimaryKey() {
+        return true;
+    }
+
+    /**
+     * @return true if {@link PDataType} supports equality operators (=,!=,<,>,<=,>=) otherwise
+     *         false.
+     */
+    public boolean isEqualitySupported() {
+        return true;
+    }
+  
   /**
    * @return true when {@code lhs} equals any of {@code rhs}.
    */
@@ -793,7 +808,7 @@
   public final boolean isNull(byte[] value) {
     return value == null || value.length == 0;
   }
-
+  
   public byte[] toBytes(Object object, SortOrder sortOrder) {
     Preconditions.checkNotNull(sortOrder);
     byte[] bytes = toBytes(object);
@@ -1152,17 +1167,17 @@
       return null;
     }
     for (PDataType type : PDataType.values()) {
-      if (type.isArrayType()) {
-        PhoenixArray arr = (PhoenixArray) value;
-        if ((type.getSqlType() == arr.baseType.sqlType + PDataType.ARRAY_TYPE_BASE)
-            && type.getJavaClass().isInstance(value)) {
-          return type;
+        if(type.getJavaClass().isInstance(value)){
+    		if (type.isArrayType()) {
+    			PhoenixArray arr = (PhoenixArray) value;
+    			if ((type.getSqlType() == arr.baseType.sqlType
+    					+ PDataType.ARRAY_TYPE_BASE)) {
+    				return type;
+    			}
+    		} else {
+    			return type;
+    		}
         }
-      } else {
-        if (type.getJavaClass().isInstance(value)) {
-          return type;
-        }
-      }
     }
     throw new UnsupportedOperationException(
         "Unsupported literal value [" + value + "] of type " + value.getClass().getName());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
index 45a9657..85ed169 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
@@ -96,6 +96,7 @@
     types.add(PVarbinaryArray.INSTANCE);
     types.add(PVarchar.INSTANCE);
     types.add(PVarcharArray.INSTANCE);
+    types.add(PJson.INSTANCE);
 
     classToInstance = new HashMap<>(types.size());
     for (PDataType t : types) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PJson.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PJson.java
new file mode 100644
index 0000000..0b556b2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PJson.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.types;
+
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.Format;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.json.PhoenixJson;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * A Phoenix data type to represent JSON. The json data type stores an exact copy of the input text,
+ * which processing functions must reparse on each execution. Because the json type stores an exact
+ * copy of the input text, it will preserve semantically-insignificant white space between tokens,
+ * as well as the order of keys within JSON objects. Also, if a JSON object within the value
+ * contains the same key more than once, all the key/value pairs are kept. It stores the data as
+ * string in single column of HBase and it has same data size limit as Phoenix's Varchar.
+ * <p>
+ * JSON data types are for storing JSON (JavaScript Object Notation) data, as specified in RFC 7159.
+ * Such data can also be stored as text, but the JSON data types have the advantage of enforcing
+ * that each stored value is valid according to the JSON rules.
+ */
+public class PJson extends PDataType<String> {
+
+    public static final PJson INSTANCE = new PJson();
+
+    PJson() {
+        super("JSON", Types.OTHER, PhoenixJson.class, null, 48);
+    }
+
+    @Override
+    public boolean canBePrimaryKey() {
+        return false;
+    }
+
+    @Override
+    public boolean isEqualitySupported() {
+        return false;
+    }
+
+    @Override
+    public int toBytes(Object object, byte[] bytes, int offset) {
+
+        if (object == null) {
+            return 0;
+        }
+        byte[] b = toBytes(object);
+        System.arraycopy(b, 0, bytes, offset, b.length);
+        return b.length;
+
+    }
+
+    @Override
+    public byte[] toBytes(Object object) {
+        if (object == null) {
+            return ByteUtil.EMPTY_BYTE_ARRAY;
+        }
+        PhoenixJson phoenixJson = (PhoenixJson) object;
+        return phoenixJson.toBytes();
+    }
+
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length,
+            @SuppressWarnings("rawtypes") PDataType actualType, SortOrder sortOrder,
+            Integer maxLength, Integer scale) {
+
+        Object object =
+                PVarchar.INSTANCE.toObject(bytes, offset, length, actualType, sortOrder, maxLength,
+                    scale);
+        /*
+         * avoiding the type casting of object to String by calling toString() since String's
+         * toString() returns itself.
+         */
+        return object == null ? object : getPhoenixJson(object.toString());
+
+    }
+
+    @Override
+    public Object toObject(Object object, @SuppressWarnings("rawtypes") PDataType actualType) {
+        if (object == null) {
+            return null;
+        }
+        if (equalsAny(actualType, PJson.INSTANCE)) {
+            return object;
+        }
+        if (equalsAny(actualType, PVarchar.INSTANCE)) {
+            return getPhoenixJson(object.toString());
+        }
+        return throwConstraintViolationException(actualType, this);
+    }
+
+    @Override
+    public boolean isCoercibleTo(@SuppressWarnings("rawtypes") PDataType targetType) {
+        return equalsAny(targetType, this, PVarchar.INSTANCE);
+
+    }
+
+    @Override
+    public boolean isSizeCompatible(ImmutableBytesWritable ptr, Object value,
+            @SuppressWarnings("rawtypes") PDataType srcType, Integer maxLength, Integer scale,
+            Integer desiredMaxLength, Integer desiredScale) {
+        return PVarchar.INSTANCE.isSizeCompatible(ptr, value, srcType, maxLength, scale,
+            desiredMaxLength, desiredScale);
+    }
+
+    @Override
+    public boolean isFixedWidth() {
+        return false;
+    }
+
+    @Override
+    public int estimateByteSize(Object o) {
+        PhoenixJson phoenixJson = (PhoenixJson) o;
+        return phoenixJson.estimateByteSize();
+    }
+
+    @Override
+    public Integer getByteSize() {
+        return null;
+    }
+
+    @Override
+    public int compareTo(Object lhs, Object rhs, @SuppressWarnings("rawtypes") PDataType rhsType) {
+        if (PJson.INSTANCE != rhsType) {
+            throwConstraintViolationException(rhsType, this);
+        }
+        throw new EqualityNotSupportedException(PJson.INSTANCE);
+    }
+
+    @Override
+    public Object toObject(String value) {
+        return getPhoenixJson(value);
+    }
+
+    @Override
+    public boolean isBytesComparableWith(@SuppressWarnings("rawtypes") PDataType otherType) {
+        return otherType == PJson.INSTANCE || otherType == PVarchar.INSTANCE;
+    }
+
+    @Override
+    public String toStringLiteral(Object o, Format formatter) {
+        if (o == null) {
+            return StringUtil.EMPTY_STRING;
+        }
+        PhoenixJson phoenixJson = (PhoenixJson) o;
+        return PVarchar.INSTANCE.toStringLiteral(phoenixJson.toString(), formatter);
+    }
+
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        Preconditions.checkArgument(maxLength == null || maxLength >= 0);
+
+        char[] key = new char[4];
+        char[] value = new char[4];
+        int length = maxLength != null ? maxLength : 1;
+        if (length > (key.length + value.length)) {
+            key = new char[length + 2];
+            value = new char[length - key.length];
+        }
+        int j = 1;
+        key[0] = '"';
+        key[j++] = 'k';
+        for (int i = 2; i < key.length - 1; i++) {
+            key[j++] = (char) ('0' + RANDOM.get().nextInt(Byte.MAX_VALUE) % 10);
+        }
+        key[j] = '"';
+
+        int k = 1;
+        value[0] = '"';
+        value[k++] = 'v';
+        for (int i = 2; i < value.length - 1; i++) {
+            value[k++] = (char) ('0' + RANDOM.get().nextInt(Byte.MAX_VALUE) % 10);
+        }
+        value[k] = '"';
+        StringBuilder sbr = new StringBuilder();
+        sbr.append("{").append(key).append(":").append(value).append("}");
+
+        return getPhoenixJson(sbr.toString());
+    }
+
+    private Object getPhoenixJson(String jsonData) {
+        try {
+            return PhoenixJson.getInstance(jsonData);
+        } catch (SQLException sqe) {
+            throw new IllegalDataException(sqe);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarchar.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
index fa3dbad..509e090 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
@@ -83,7 +83,7 @@
 
   @Override
   public boolean isCoercibleTo(PDataType targetType) {
-    return equalsAny(targetType, this, PChar.INSTANCE, PVarbinary.INSTANCE, PBinary.INSTANCE);
+    return equalsAny(targetType, this, PChar.INSTANCE, PVarbinary.INSTANCE, PBinary.INSTANCE, PJsonDataType.INSTANCE);
   }
 
   @Override
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
index b7baa97..0d49ab0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
@@ -26,6 +26,7 @@
 import java.util.HashMap;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.json.PhoenixJson;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDecimal;
@@ -34,7 +35,6 @@
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.junit.Test;
-
 import org.apache.phoenix.schema.types.PDataType;
 
 /**
@@ -49,6 +49,7 @@
 	private static final HashMap<Class, Object> map = new HashMap<Class, Object>();
 	
 	static {
+		map.put(PhoenixJson.class, "[1,2,3]");
 		map.put(String.class, "a");
 		map.put(Long.class, 1l);	
 		map.put(Integer.class, 1);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/json/PhoenixJsonTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/json/PhoenixJsonTest.java
new file mode 100644
index 0000000..4f9455d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/json/PhoenixJsonTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link PhoenixJson}.
+ */
+public class PhoenixJsonTest {
+    public static final String TEST_JSON_STR =
+            "{\"f2\":{\"f3\":\"value\"},\"f4\":{\"f5\":99,\"f6\":[1,true,\"foo\"]},\"f7\":true}";
+
+    @Test
+    public void testParsingForJsonHavingChineseChars() throws Exception {
+
+        String jsonWithChineseChars = "[\"'普派'\"]";
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(jsonWithChineseChars);
+        assertNotNull(phoenixJson);
+        assertEquals(jsonWithChineseChars, phoenixJson.toString());
+
+    }
+
+    @Test
+    public void testParsingForJsonHavingControlAndQuoteChars() throws Exception {
+
+        String jsonWithControlChars = "[\"\\n \\\"jumps \\r'普派'\"]";
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(jsonWithControlChars);
+        assertNotNull(phoenixJson);
+        assertEquals(jsonWithControlChars, phoenixJson.toString());
+
+    }
+
+    @Test
+    public void testEmptyJsonParsing() throws Exception {
+
+        String emptyJson = "{}";
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(emptyJson);
+        assertNotNull(phoenixJson);
+        assertEquals(emptyJson, phoenixJson.toString());
+
+    }
+
+    @Test
+    public void testZeroLengthJsonStringForParsing() throws Exception {
+
+        String emptyJson = "";
+        try {
+            PhoenixJson.getInstance(emptyJson);
+        } catch (SQLException expectedException) {
+            assertEquals("error code is not as expected.",
+                SQLExceptionCode.INVALID_JSON_DATA.getErrorCode(), expectedException.getErrorCode());
+            assertEquals("sql state is not as expected.",
+                SQLExceptionCode.INVALID_JSON_DATA.getSQLState(), expectedException.getSQLState());
+        }
+
+    }
+
+    @Test
+    public void testNullJsonStringForParsing() throws Exception {
+
+        String nullJson = null;
+        try {
+            PhoenixJson.getInstance(nullJson);
+        } catch (SQLException expectedException) {
+            assertEquals("error code is not as expected.",
+                SQLExceptionCode.INVALID_JSON_DATA.getErrorCode(), expectedException.getErrorCode());
+            assertEquals("sql state is not as expected.",
+                SQLExceptionCode.INVALID_JSON_DATA.getSQLState(), expectedException.getSQLState());
+        }
+
+    }
+
+    @Test
+    public void testJsonArrayParsing() throws Exception {
+
+        String jsonArrayString = "[1,2,3]";
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(jsonArrayString);
+        assertNotNull(phoenixJson);
+        assertEquals(jsonArrayString, phoenixJson.toString());
+    }
+
+    @Test
+    public void testVaidJsonParsing() throws Exception {
+
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        assertNotNull(phoenixJson);
+        assertEquals(TEST_JSON_STR, phoenixJson.toString());
+    }
+
+    @Test
+    public void getPhoenixJson() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        PhoenixJson phoenixJson2 = phoenixJson.getPhoenixJson(new String[] { "f2", "f3" });
+        assertEquals("value", phoenixJson2.serializeToString());
+
+        String[] paths = new String[] { "f2", "f3", "f4" };
+        try {
+            phoenixJson.getPhoenixJson(paths);
+        } catch (Exception e) {
+            SQLException jsonException =
+                    new SQLException("path: " + Arrays.asList(paths) + " not found.");
+            assertEquals(jsonException.getMessage(), e.getMessage());
+            assertEquals(jsonException.getClass(), e.getClass());
+        }
+    }
+
+    @Test
+    public void getNullablePhoenixJson() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        PhoenixJson phoenixJson2 = phoenixJson.getPhoenixJsonOrNull(new String[] { "f2", "f3" });
+        assertEquals("value", phoenixJson2.serializeToString());
+
+        assertNull(phoenixJson.getPhoenixJsonOrNull(new String[] { "f2", "f3", "f4" }));
+        assertNotNull(phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "1" }));
+        assertNull(phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "3" }));
+        assertNull(phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "-1" }));
+    }
+
+    @Test
+    public void serializeToString() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        PhoenixJson phoenixJson2 = phoenixJson.getPhoenixJson(new String[] { "f4", "f5" });
+        assertEquals(new Integer(99).toString(), phoenixJson2.serializeToString());
+
+        PhoenixJson phoenixJson3 = phoenixJson.getPhoenixJson(new String[] { "f7" });
+        assertEquals(Boolean.TRUE.toString(), phoenixJson3.serializeToString());
+
+        PhoenixJson phoenixJson4 = phoenixJson.getPhoenixJson(new String[] { "f2", "f3" });
+        assertEquals("value", phoenixJson4.serializeToString());
+
+        PhoenixJson phoenixJson5 = phoenixJson.getPhoenixJson(new String[] { "f4", "f6" });
+
+        assertEquals("[1,true,\"foo\"]", phoenixJson5.serializeToString());
+    }
+
+    @Test
+    public void serializeToStringForJsonArray() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        PhoenixJson phoenixJson5 =
+                phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "0" });
+        assertEquals(new Integer(1).toString(), phoenixJson5.serializeToString());
+        phoenixJson5 = phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "1" });
+        assertEquals(Boolean.TRUE.toString(), phoenixJson5.serializeToString());
+        phoenixJson5 = phoenixJson.getPhoenixJsonOrNull(new String[] { "f4", "f6", "2" });
+        assertEquals("foo", phoenixJson5.serializeToString());
+    }
+
+    @Test
+    public void testToString() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(TEST_JSON_STR);
+        assertEquals(TEST_JSON_STR, phoenixJson.toString());
+    }
+
+    @Test
+    public void compareTo() throws Exception {
+        PhoenixJson phoenixJson1 = PhoenixJson.getInstance(TEST_JSON_STR);
+
+       try{
+        phoenixJson1.compareTo(phoenixJson1);
+       }catch(EqualityNotSupportedException x){
+           SQLException sqe =(SQLException)x.getCause();
+           assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(), sqe.getErrorCode());
+       }
+       
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
index 85f9436..8d445ec 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
@@ -1714,6 +1714,7 @@
              + "FLOAT ARRAY=[BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, VARBINARY ARRAY], "
              + "INTEGER=[BIGINT, BINARY, DECIMAL, DOUBLE, FLOAT, INTEGER, VARBINARY], "
              + "INTEGER ARRAY=[BIGINT ARRAY, BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, INTEGER ARRAY, VARBINARY ARRAY], "
+             + "JSON=[JSON, VARCHAR], "
              + "SMALLINT=[BIGINT, BINARY, DECIMAL, DOUBLE, FLOAT, INTEGER, SMALLINT, VARBINARY], "
              + "SMALLINT ARRAY=[BIGINT ARRAY, BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, INTEGER ARRAY, SMALLINT ARRAY, VARBINARY ARRAY], "
              + "TIME=[BINARY, DATE, TIME, TIMESTAMP, VARBINARY], "
@@ -1742,7 +1743,7 @@
              + "UNSIGNED_TINYINT ARRAY=[BIGINT ARRAY, BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, INTEGER ARRAY, SMALLINT ARRAY, TINYINT ARRAY, UNSIGNED_DOUBLE ARRAY, UNSIGNED_FLOAT ARRAY, UNSIGNED_INT ARRAY, UNSIGNED_LONG ARRAY, UNSIGNED_SMALLINT ARRAY, UNSIGNED_TINYINT ARRAY, VARBINARY ARRAY], "
              + "VARBINARY=[BINARY, VARBINARY], "
              + "VARBINARY ARRAY=[BINARY ARRAY, VARBINARY ARRAY], "
-             + "VARCHAR=[BINARY, CHAR, VARBINARY, VARCHAR], "
+             + "VARCHAR=[BINARY, CHAR, JSON, VARBINARY, VARCHAR], "
              + "VARCHAR ARRAY=[BINARY ARRAY, CHAR ARRAY, VARBINARY ARRAY, VARCHAR ARRAY]}", 
              coercibleToMap.toString());
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PJsonTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PJsonTest.java
new file mode 100644
index 0000000..5939ef7
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PJsonTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.EqualityNotSupportedException;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.schema.json.PhoenixJson;
+import org.apache.phoenix.schema.json.PhoenixJsonTest;
+import org.apache.phoenix.util.ByteUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link PJson}.
+ */
+public class PJsonTest {
+
+    final byte[] json = PhoenixJsonTest.TEST_JSON_STR.getBytes();
+
+    @Test
+    public void testToBytesWithOffset() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+
+        byte[] bytes = new byte[json.length];
+
+        assertEquals(json.length, PJson.INSTANCE.toBytes(phoenixJson, bytes, 0));
+
+        assertArrayEquals(json, bytes);
+    }
+
+    @Test
+    public void testToBytes() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+
+        byte[] bytes = PJson.INSTANCE.toBytes(phoenixJson);
+        assertArrayEquals(json, bytes);
+    }
+    
+    @Test
+    public void testToBytesForNull() throws Exception {
+         assertEquals(ByteUtil.EMPTY_BYTE_ARRAY,PJson.INSTANCE.toBytes(null));
+         assertEquals(ByteUtil.EMPTY_BYTE_ARRAY,PJson.INSTANCE.toBytes(null, SortOrder.ASC));
+         assertEquals(ByteUtil.EMPTY_BYTE_ARRAY,PJson.INSTANCE.toBytes(null, SortOrder.DESC));
+    }
+
+    @Test
+    public void testToObjectWithSortOrder() {
+        Object object =
+                PJson.INSTANCE.toObject(json, 0, json.length, PVarchar.INSTANCE, SortOrder.ASC,
+                    Integer.MAX_VALUE, Integer.MAX_VALUE);
+        PhoenixJson phoenixJson = (PhoenixJson) object;
+        assertEquals(PhoenixJsonTest.TEST_JSON_STR, phoenixJson.toString());
+    }
+
+    @Test
+    public void testToObject() {
+        Object object =
+                PJson.INSTANCE.toObject(json, 0, json.length, PVarchar.INSTANCE, SortOrder.ASC,
+                    Integer.MAX_VALUE, Integer.MAX_VALUE);
+        PhoenixJson phoenixJson = (PhoenixJson) object;
+        assertEquals(PhoenixJsonTest.TEST_JSON_STR, phoenixJson.toString());
+
+        Object object2 = PJson.INSTANCE.toObject(phoenixJson, PJson.INSTANCE);
+        assertEquals(phoenixJson.toString(), object2.toString());
+
+        PJson.INSTANCE.toObject(PhoenixJsonTest.TEST_JSON_STR, PVarchar.INSTANCE);
+        assertEquals(phoenixJson.toString(), object2.toString());
+
+        try {
+            PJson.INSTANCE.toObject(PhoenixJsonTest.TEST_JSON_STR, PChar.INSTANCE);
+        } catch (ConstraintViolationException sqe) {
+            TypeMismatchException e = (TypeMismatchException) sqe.getCause();
+            assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), e.getErrorCode());
+        }
+
+        PhoenixJson object4 = (PhoenixJson) PJson.INSTANCE.toObject(PhoenixJsonTest.TEST_JSON_STR);
+        assertEquals(phoenixJson.toString(), object4.toString());
+        
+    }
+
+    @Test
+    public void testToObjectForNull(){
+        String jsonStr= null;
+        assertNull(PJson.INSTANCE.toObject(jsonStr));
+        
+        Object jsonObj = null;
+        assertNull(PJson.INSTANCE.toObject(jsonObj, PJson.INSTANCE));
+    }
+
+    @Test
+    public void isFixedWidth() {
+        assertFalse(PJson.INSTANCE.isFixedWidth());
+    }
+
+    public void getByteSize() {
+        assertNull(PJson.INSTANCE.getByteSize());
+    }
+
+    public void estimateByteSize() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+        assertEquals(PhoenixJsonTest.TEST_JSON_STR.length(),
+            PJson.INSTANCE.estimateByteSize(phoenixJson));
+    }
+
+    @Test
+    public void compareTo() throws Exception {
+        PhoenixJson phoenixJson1 = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+
+        PhoenixJson phoenixJson2 = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+
+        try{
+            assertEquals(0, PJson.INSTANCE.compareTo(phoenixJson1, phoenixJson2, PJson.INSTANCE));
+            Assert.fail("Comparision on PJson should have thrown ConstraintViolationException");
+        }catch(EqualityNotSupportedException x){
+            SQLException sqe = (SQLException) x.getCause();
+            assertEquals(SQLExceptionCode.NON_EQUALITY_COMPARISON.getErrorCode(),
+                sqe.getErrorCode());
+        }
+
+        try {
+            PJson.INSTANCE
+                    .compareTo(phoenixJson1, PhoenixJsonTest.TEST_JSON_STR, PVarchar.INSTANCE);
+            Assert.fail("Comparision on PJson should have thrown ConstraintViolationException");
+        } catch (ConstraintViolationException x) {
+            TypeMismatchException e = (TypeMismatchException) x.getCause();
+            assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void isBytesComparableWith() {
+        assertTrue(PJson.INSTANCE.isBytesComparableWith(PJson.INSTANCE));
+        assertTrue(PJson.INSTANCE.isBytesComparableWith(PVarchar.INSTANCE));
+        assertFalse(PJson.INSTANCE.isBytesComparableWith(PChar.INSTANCE));
+    }
+
+    @Test
+    public void toStringLiteral() throws Exception {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+        String stringLiteral = PJson.INSTANCE.toStringLiteral(phoenixJson, null);
+        assertEquals(PVarchar.INSTANCE.toStringLiteral(PhoenixJsonTest.TEST_JSON_STR, null),
+            stringLiteral);
+    }
+
+    @Test
+    public void coerceBytes() throws SQLException {
+        PhoenixJson phoenixJson = PhoenixJson.getInstance(PhoenixJsonTest.TEST_JSON_STR);
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(json, 0, 0);
+        PJson.INSTANCE.coerceBytes(ptr, phoenixJson, PJson.INSTANCE, json.length, new Integer(10),
+            SortOrder.ASC, json.length, new Integer(10), SortOrder.ASC);
+        assertEquals(0, ptr.getLength());
+
+        ptr.set(json);
+        PJson.INSTANCE.coerceBytes(ptr, phoenixJson, PVarchar.INSTANCE, json.length,
+            new Integer(10), SortOrder.ASC, json.length, new Integer(10), SortOrder.ASC);
+        assertArrayEquals(json, ptr.get());
+    }
+
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
index b7f0773..6ee7da5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
@@ -50,6 +50,7 @@
              + "FLOAT ARRAY=FLOAT ARRAY, "
              + "INTEGER=INTEGER, "
              + "INTEGER ARRAY=INTEGER ARRAY, "
+             + "JSON=JSON, "
              + "SMALLINT=SMALLINT, "
              + "SMALLINT ARRAY=SMALLINT ARRAY, "
              + "TIME=TIME, "
@@ -103,6 +104,7 @@
              + "FLOAT ARRAY=FLOAT ARRAY, "
              + "INTEGER=DECIMAL, "
              + "INTEGER ARRAY=INTEGER ARRAY, "
+             + "JSON=JSON, "
              + "SMALLINT=DECIMAL, "
              + "SMALLINT ARRAY=SMALLINT ARRAY, "
              + "TIME=DECIMAL, "
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
index b6465c3..920f78e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
@@ -37,6 +37,7 @@
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
@@ -132,7 +133,17 @@
                 // create a table by using the type name as returned by PDataType
                 sb.append("CREATE TABLE " + tableName + " (");
                 sb.append(columnName + " " + sqlTypeName + " NOT NULL PRIMARY KEY, V1 VARCHAR)");
-                conn.createStatement().execute(sb.toString());
+                if(!pType.canBePrimaryKey()) {
+	                try{
+	                    conn.createStatement().execute(sb.toString());
+	                    fail("<"+pType.toString()+ "> should throw exception since primary key is not supported");
+	                }catch(SQLException sqe){
+	                    assertEquals(SQLExceptionCode.INVALID_PRIMARY_KEY_CONSTRAINT.getErrorCode(), sqe.getErrorCode());
+	                }
+	                continue;
+	            }else {
+	                conn.createStatement().execute(sb.toString());
+	            }
 
                 // generate the optimized query plan by going through the pk of the table.
                 PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE " + columnName  + " = ?");