PHOENIX-628 Support native JSON data type (#1780)

diff --git a/phoenix-core-client/pom.xml b/phoenix-core-client/pom.xml
index 77890fc..c036d1e 100644
--- a/phoenix-core-client/pom.xml
+++ b/phoenix-core-client/pom.xml
@@ -182,6 +182,7 @@
           <excludes>
             <exclude>src/main/java/org/apache/phoenix/coprocessor/generated/*.java</exclude>
             <exclude>src/main/resources/META-INF/services/java.sql.Driver</exclude>
+            <exclude>src/it/resources/json/*.json</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -287,6 +288,19 @@
       <artifactId>zookeeper-jute</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.jayway.jsonpath</groupId>
+      <artifactId>json-path</artifactId>
+      <version>2.6.0</version>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.mongodb/bson -->
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>bson</artifactId>
+      <version>4.4.0</version>
+    </dependency>
+
     <!-- Transaction dependencies -->
     <!-- Omid dependencies -->
     <dependency>
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 35d193d..d640d1a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -47,6 +47,7 @@
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PDateArray;
+import org.apache.phoenix.schema.types.PJson;
 import org.apache.phoenix.schema.types.PNumericType;
 import org.apache.phoenix.schema.types.PTime;
 import org.apache.phoenix.schema.types.PTimeArray;
@@ -110,6 +111,8 @@
             return "ARRAY[" + getValue(PDate.INSTANCE) + "]";
         } else if (type instanceof PArrayDataType) {
             return "ARRAY" + type.getSampleValue().toString();
+        } else if (type instanceof PJson) {
+            return "'{a:1}'";
         } else {
             return "0123";
         }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 67e1a82..cd5dfef 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -89,6 +89,7 @@
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionInfo;
 import org.apache.phoenix.parse.InListParseNode;
 import org.apache.phoenix.parse.IsNullParseNode;
+import org.apache.phoenix.parse.JsonQueryParseNode;
 import org.apache.phoenix.parse.LikeParseNode;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
 import org.apache.phoenix.parse.LiteralParseNode;
@@ -116,7 +117,6 @@
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
@@ -142,6 +142,7 @@
 
 public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expression> {
     private boolean isAggregate;
+    private boolean isJsonFragment;
     protected ParseNode aggregateFunction;
     protected final StatementContext context;
     protected final GroupBy groupBy;
@@ -172,6 +173,10 @@
         return isAggregate;
     }
 
+    public boolean isJsonFragment() {
+        return isJsonFragment;
+    }
+
     public boolean isTopLevel() {
         return nodeCount == 0;
     }
@@ -180,6 +185,7 @@
         this.isAggregate = false;
         this.nodeCount = 0;
         this.totalNodeCount = 0;
+        this.isJsonFragment = false;
     }
 
     @Override
@@ -202,6 +208,18 @@
         ParseNode rhsNode = node.getChildren().get(1);
         Expression lhsExpr = children.get(0);
         Expression rhsExpr = children.get(1);
+
+        PDataType dataTypeOfLHSExpr = lhsExpr.getDataType();
+        if (dataTypeOfLHSExpr != null && !dataTypeOfLHSExpr.isComparisonSupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED)
+                    .setMessage(" for type " + dataTypeOfLHSExpr).build().buildException();
+        }
+        PDataType dataTypeOfRHSExpr = rhsExpr.getDataType();
+        if (dataTypeOfRHSExpr != null && !dataTypeOfRHSExpr.isComparisonSupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED)
+                    .setMessage(" for type " + dataTypeOfRHSExpr).build().buildException();
+        }
+
         CompareOperator op = node.getFilterOp();
 
         if (lhsNode instanceof RowValueConstructorParseNode && rhsNode instanceof RowValueConstructorParseNode) {
@@ -278,10 +296,14 @@
 
     @Override
     public boolean visitEnter(FunctionParseNode node) throws SQLException {
+        if (node instanceof JsonQueryParseNode) {
+            this.isJsonFragment = true;
+        }
         // TODO: Oracle supports nested aggregate function while other DBs don't. Should we?
         if (node.isAggregate()) {
             if (aggregateFunction != null) {
-                throw new SQLFeatureNotSupportedException("Nested aggregate functions are not supported");
+                throw new SQLFeatureNotSupportedException(
+                        "Nested aggregate functions are not supported");
             }
             this.aggregateFunction = node;
             this.isAggregate = true;
@@ -484,7 +506,15 @@
         ParseNode rhsNode = node.getChildren().get(1);
         Expression lhs = children.get(0);
         Expression rhs = children.get(1);
-        if ( rhs.getDataType() != null && lhs.getDataType() != null && 
+        if (lhs.getDataType() != null && !lhs.getDataType().isComparisonSupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED).setMessage(
+                    " for type " + lhs.getDataType()).build().buildException();
+        }
+        if (rhs.getDataType() != null && !rhs.getDataType().isComparisonSupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED).setMessage(
+                    " for type " + rhs.getDataType()).build().buildException();
+        }
+        if ( rhs.getDataType() != null && lhs.getDataType() != null &&
                 !lhs.getDataType().isCoercibleTo(rhs.getDataType())  && 
                 !rhs.getDataType().isCoercibleTo(lhs.getDataType())) {
             throw TypeMismatchException.newException(lhs.getDataType(), rhs.getDataType(), node.toString());
@@ -639,7 +669,12 @@
         ImmutableBytesWritable ptr = context.getTempPtr();
         PDataType firstChildType = firstChild.getDataType();
         ParseNode firstChildNode = node.getChildren().get(0);
-        
+
+        if (firstChildType != null && !firstChildType.isComparisonSupported()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED)
+                    .setMessage(" for type " + firstChildType).build().buildException();
+        }
+
         if (firstChildNode instanceof BindParseNode) {
             PDatum datum = firstChild;
             if (firstChildType == null) {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 7ac1120..9e0f928 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -47,6 +47,8 @@
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
+import org.apache.phoenix.expression.function.JsonQueryFunction;
+import org.apache.phoenix.expression.function.JsonValueFunction;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.expression.visitor.ProjectedColumnExpressionVisitor;
 import org.apache.phoenix.expression.visitor.ReplaceArrayFunctionExpressionVisitor;
@@ -85,6 +87,7 @@
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -378,16 +381,18 @@
     public static RowProjector compile(StatementContext context, SelectStatement statement,
             GroupBy groupBy, List<? extends PDatum> targetColumns, Expression where,
             boolean wildcardIncludesDynamicCols) throws SQLException {
-        List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<>();
-        List<ProjectedColumnExpression> arrayProjectedColumnRefs = new ArrayList<>();
-        List<Expression> arrayKVFuncs = new ArrayList<>();
-        List<Expression> arrayOldFuncs = new ArrayList<>();
-        Map<Expression, Integer> arrayExpressionCounts = new HashMap<>();
+        List<KeyValueColumnExpression> serverParsedKVRefs = new ArrayList<>();
+        List<ProjectedColumnExpression> serverParsedProjectedColumnRefs = new ArrayList<>();
+        List<Expression> serverParsedKVFuncs = new ArrayList<>();
+        List<Expression> serverParsedOldFuncs = new ArrayList<>();
+        Map<Expression, Integer> serverParsedExpressionCounts = new HashMap<>();
         List<AliasedNode> aliasedNodes = statement.getSelect();
         // Setup projected columns in Scan
-        SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy, arrayKVRefs,
-                arrayKVFuncs, arrayExpressionCounts, arrayProjectedColumnRefs, arrayOldFuncs,
-                statement);
+        SelectClauseVisitor
+                selectVisitor =
+                new SelectClauseVisitor(context, groupBy, serverParsedKVRefs, serverParsedKVFuncs,
+                        serverParsedExpressionCounts, serverParsedProjectedColumnRefs,
+                        serverParsedOldFuncs, statement);
         List<ExpressionProjector> projectedColumns = new ArrayList<>();
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
@@ -484,42 +489,99 @@
             index++;
         }
 
-        for (int i = arrayProjectedColumnRefs.size() - 1; i >= 0; i--) {
-            Expression expression = arrayProjectedColumnRefs.get(i);
-            Integer count = arrayExpressionCounts.get(expression);
+        for (int i = serverParsedProjectedColumnRefs.size() - 1; i >= 0; i--) {
+            Expression expression = serverParsedProjectedColumnRefs.get(i);
+            Integer count = serverParsedExpressionCounts.get(expression);
             if (count != 0) {
-                arrayKVRefs.remove(i);
-                arrayKVFuncs.remove(i);
-                arrayOldFuncs.remove(i);
+                serverParsedKVRefs.remove(i);
+                serverParsedKVFuncs.remove(i);
+                serverParsedOldFuncs.remove(i);
             }
         }
 
-        if (arrayKVFuncs.size() > 0 && arrayKVRefs.size() > 0) {
-            serailizeArrayIndexInformationAndSetInScan(context, arrayKVFuncs, arrayKVRefs);
+        if (serverParsedKVFuncs.size() > 0 && serverParsedKVRefs.size() > 0) {
+            String[]
+                    scanAttributes =
+                    new String[] { BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX,
+                            BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION,
+                            BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION };
+            Map<String, Class> attributeToFunctionMap = new HashMap<String, Class>() {{
+                put(scanAttributes[0], ArrayIndexFunction.class);
+                put(scanAttributes[1], JsonValueFunction.class);
+                put(scanAttributes[2], JsonQueryFunction.class);
+            }};
+            // This map is to keep track of the positions that get swapped with rearranging
+            // the functions in the serialized data to server.
+            Map<Integer, Integer> initialToShuffledPositionMap = new HashMap<>();
+            Map<String, List<Expression>>
+                    serverAttributeToFuncExpressionMap =
+                    new HashMap<String, List<Expression>>() {{
+                        for (String attribute : attributeToFunctionMap.keySet()) {
+                            put(attribute, new ArrayList<>());
+                        }
+                    }};
+            Map<String, List<KeyValueColumnExpression>>
+                    serverAttributeToKVExpressionMap =
+                    new HashMap<String, List<KeyValueColumnExpression>>() {{
+                        for (String attribute : attributeToFunctionMap.keySet()) {
+                            put(attribute, new ArrayList<>());
+                        }
+                    }};
+            int counter = 0;
+            for (String attribute : scanAttributes) {
+                for (int i = 0; i < serverParsedKVFuncs.size(); i++) {
+                    if (attributeToFunctionMap.get(attribute)
+                            .isInstance(serverParsedKVFuncs.get(i))) {
+                        initialToShuffledPositionMap.put(i, counter++);
+                        serverAttributeToFuncExpressionMap.get(attribute)
+                                .add(serverParsedKVFuncs.get(i));
+                        serverAttributeToKVExpressionMap.get(attribute)
+                                .add(serverParsedKVRefs.get(i));
+                    }
+                }
+            }
+            for (Map.Entry<String, Class> entry : attributeToFunctionMap.entrySet()) {
+                if (serverAttributeToFuncExpressionMap.get(entry.getKey()).size() > 0) {
+                    serializeServerParsedExpressionInformationAndSetInScan(context, entry.getKey(),
+                            serverAttributeToFuncExpressionMap.get(entry.getKey()),
+                            serverAttributeToKVExpressionMap.get(entry.getKey()));
+                }
+            }
             KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-            for (Expression expression : arrayKVRefs) {
+            for (Expression expression : serverParsedKVRefs) {
                 builder.addField(expression);
             }
             KeyValueSchema kvSchema = builder.build();
             ValueBitSet arrayIndexesBitSet = ValueBitSet.newInstance(kvSchema);
             builder = new KeyValueSchemaBuilder(0);
-            for (Expression expression : arrayKVFuncs) {
+            for (Expression expression : serverParsedKVFuncs) {
                 builder.addField(expression);
             }
             KeyValueSchema arrayIndexesSchema = builder.build();
 
             Map<Expression, Expression> replacementMap = new HashMap<>();
-            for(int i = 0; i < arrayOldFuncs.size(); i++){
-                Expression function =arrayKVFuncs.get(i);
-                replacementMap.put(arrayOldFuncs.get(i), new ArrayIndexExpression(i, function.getDataType(), arrayIndexesBitSet, arrayIndexesSchema));
+            for (int i = 0; i < serverParsedOldFuncs.size(); i++) {
+                Expression function = serverParsedKVFuncs.get(i);
+                replacementMap.put(serverParsedOldFuncs.get(i),
+                        new ArrayIndexExpression(initialToShuffledPositionMap.get(i),
+                                function.getDataType(), arrayIndexesBitSet, arrayIndexesSchema));
+
             }
 
-            ReplaceArrayFunctionExpressionVisitor visitor = new ReplaceArrayFunctionExpressionVisitor(replacementMap);
+            ReplaceArrayFunctionExpressionVisitor
+                    visitor =
+                    new ReplaceArrayFunctionExpressionVisitor(replacementMap);
             for (int i = 0; i < projectedColumns.size(); i++) {
                 ExpressionProjector projector = projectedColumns.get(i);
-                projectedColumns.set(i, new ExpressionProjector(projector.getName(),
-                        projector.getLabel(),
-                        tableRef.getTableAlias() == null ? (table.getName() == null ? "" : table.getName().getString()) : tableRef.getTableAlias(), projector.getExpression().accept(visitor), projector.isCaseSensitive()));
+                projectedColumns.set(i,
+                        new ExpressionProjector(projector.getName(), projector.getLabel(),
+                                tableRef.getTableAlias() == null ?
+                                        (table.getName() == null ?
+                                                "" :
+                                                table.getName().getString()) :
+                                        tableRef.getTableAlias(),
+                                projector.getExpression().accept(visitor),
+                                projector.isCaseSensitive()));
             }
         }
 
@@ -628,19 +690,24 @@
             return null;
         }
     }
-    private static void serailizeArrayIndexInformationAndSetInScan(StatementContext context, List<Expression> arrayKVFuncs,
-            List<KeyValueColumnExpression> arrayKVRefs) {
+
+    private static void serializeServerParsedExpressionInformationAndSetInScan(
+            StatementContext context, String serverParsedExpressionAttribute,
+            List<Expression> serverParsedKVFuncs,
+            List<KeyValueColumnExpression> serverParsedKVRefs) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             DataOutputStream output = new DataOutputStream(stream);
-            // Write the arrayKVRef size followed by the keyvalues that needs to be of type arrayindex function
-            WritableUtils.writeVInt(output, arrayKVRefs.size());
-            for (Expression expression : arrayKVRefs) {
+            // Write the KVRef size followed by the keyvalues that needs to be of
+            // type arrayindex or json function based on serverParsedExpressionAttribute
+            WritableUtils.writeVInt(output, serverParsedKVRefs.size());
+            for (Expression expression : serverParsedKVRefs) {
                     expression.write(output);
             }
-            // then write the number of arrayindex functions followeed by the expression itself
-            WritableUtils.writeVInt(output, arrayKVFuncs.size());
-            for (Expression expression : arrayKVFuncs) {
+            // then write the number of arrayindex or json functions followed
+            // by the expression itself
+            WritableUtils.writeVInt(output, serverParsedKVFuncs.size());
+            for (Expression expression : serverParsedKVFuncs) {
                     expression.write(output);
             }
             
@@ -653,7 +720,7 @@
                 throw new RuntimeException(e);
             }
         }
-        context.getScan().setAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX, stream.toByteArray());
+        context.getScan().setAttribute(serverParsedExpressionAttribute, stream.toByteArray());
     }
 
     private static class SelectClauseVisitor extends ExpressionCompiler {
@@ -664,21 +731,27 @@
          */
         private boolean isCaseSensitive;
         private int elementCount;
-        private List<KeyValueColumnExpression> arrayKVRefs;
-        private List<Expression> arrayKVFuncs;
-        private List<Expression> arrayOldFuncs;
-        private List<ProjectedColumnExpression> arrayProjectedColumnRefs;
-        private Map<Expression, Integer> arrayExpressionCounts;
-        private SelectStatement statement; 
-        
-        private SelectClauseVisitor(StatementContext context, GroupBy groupBy, 
-                List<KeyValueColumnExpression> arrayKVRefs, List<Expression> arrayKVFuncs, Map<Expression, Integer> arrayExpressionCounts, List<ProjectedColumnExpression> arrayProjectedColumnRefs, List<Expression> arrayOldFuncs, SelectStatement statement) {
+        // Looks at PHOENIX-2160 for the context and use of the below variables.
+        // These are used for reference counting and converting to KeyValueColumnExpressions
+        private List<KeyValueColumnExpression> serverParsedKVRefs;
+        private List<Expression> serverParsedKVFuncs;
+        private List<Expression> serverParsedOldFuncs;
+        private List<ProjectedColumnExpression> serverParsedProjectedColumnRefs;
+        private Map<Expression, Integer> serverParsedExpressionCounts;
+        private SelectStatement statement;
+
+        private SelectClauseVisitor(StatementContext context, GroupBy groupBy,
+                List<KeyValueColumnExpression> serverParsedKVRefs,
+                List<Expression> serverParsedKVFuncs,
+                Map<Expression, Integer> serverParsedExpressionCounts,
+                List<ProjectedColumnExpression> serverParsedProjectedColumnRefs,
+                List<Expression> serverParsedOldFuncs, SelectStatement statement) {
             super(context, groupBy);
-            this.arrayKVRefs = arrayKVRefs;
-            this.arrayKVFuncs = arrayKVFuncs;
-            this.arrayOldFuncs = arrayOldFuncs;
-            this.arrayExpressionCounts = arrayExpressionCounts;
-            this.arrayProjectedColumnRefs = arrayProjectedColumnRefs;
+            this.serverParsedKVRefs = serverParsedKVRefs;
+            this.serverParsedKVFuncs = serverParsedKVFuncs;
+            this.serverParsedOldFuncs = serverParsedOldFuncs;
+            this.serverParsedExpressionCounts = serverParsedExpressionCounts;
+            this.serverParsedProjectedColumnRefs = serverParsedProjectedColumnRefs;
             this.statement = statement;
             reset();
         }
@@ -700,13 +773,18 @@
         @Override
         public Expression visit(ColumnParseNode node) throws SQLException {
             Expression expression = super.visit(node);
-            if (expression.getDataType().isArrayType()) {
-                Integer count = arrayExpressionCounts.get(expression);
-                arrayExpressionCounts.put(expression, count != null ? (count + 1) : 1);
+            if (parseOnServer(expression)) {
+                Integer count = serverParsedExpressionCounts.get(expression);
+                serverParsedExpressionCounts.put(expression, count != null ? (count + 1) : 1);
             }
             return expression;
         }
-        
+
+        private static boolean parseOnServer(Expression expression) {
+            return expression.getDataType().isArrayType() || expression.getDataType()
+                    .equals(PJson.INSTANCE);
+        }
+
         @Override
         public void addElement(List<Expression> l, Expression element) {
             elementCount++;
@@ -727,57 +805,65 @@
         @Override
         public Expression visitLeave(FunctionParseNode node, final List<Expression> children) throws SQLException {
 
-            // this need not be done for group by clause with array. Hence the below check
-            if (!statement.isAggregate() && ArrayIndexFunction.NAME.equals(node.getName()) && children.get(0) instanceof ProjectedColumnExpression) {
-                 final List<KeyValueColumnExpression> indexKVs = Lists.newArrayList();
-                 final List<ProjectedColumnExpression> indexProjectedColumns = Lists.newArrayList();
-                 final List<Expression> copyOfChildren = new ArrayList<>(children);
-                 // Create anon visitor to find reference to array in a generic way
-                 children.get(0).accept(new ProjectedColumnExpressionVisitor() {
-                     @Override
-                     public Void visit(ProjectedColumnExpression expression) {
-                         if (expression.getDataType().isArrayType()) {
-                             indexProjectedColumns.add(expression);
-                             PColumn col = expression.getColumn();
-                             // hack'ish... For covered columns with local indexes we defer to the server.
-                             if (col instanceof ProjectedColumn && ((ProjectedColumn) col)
-                                     .getSourceColumnRef() instanceof IndexUncoveredDataColumnRef) {
-                                 return null;
-                             }
-                             PTable table = context.getCurrentTable().getTable();
-                             KeyValueColumnExpression keyValueColumnExpression;
-                             if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+            // this need not be done for group by clause with array or json. Hence, the below check
+            if (!statement.isAggregate() && (ArrayIndexFunction.NAME.equals(
+                    node.getName()) || isJsonFunction(node)) &&
+                    children.get(0) instanceof ProjectedColumnExpression) {
+                final List<KeyValueColumnExpression> indexKVs = Lists.newArrayList();
+                final List<ProjectedColumnExpression> indexProjectedColumns = Lists.newArrayList();
+                final List<Expression> copyOfChildren = new ArrayList<>(children);
+                // Create anon visitor to find reference to array or json in a generic way
+                children.get(0).accept(new ProjectedColumnExpressionVisitor() {
+                    @Override
+                    public Void visit(ProjectedColumnExpression expression) {
+                        if (expression.getDataType().isArrayType() || expression.getDataType()
+                                .equals(PJson.INSTANCE)) {
+                            indexProjectedColumns.add(expression);
+                            PColumn col = expression.getColumn();
+                            // hack'ish... For covered columns with local indexes we defer to the server.
+                            if (col instanceof ProjectedColumn && ((ProjectedColumn) col).getSourceColumnRef() instanceof IndexUncoveredDataColumnRef) {
+                                return null;
+                            }
+                            PTable table = context.getCurrentTable().getTable();
+                            KeyValueColumnExpression keyValueColumnExpression;
+                            if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
                                 keyValueColumnExpression =
                                         new SingleCellColumnExpression(col,
                                                 col.getName().getString(),
                                                 table.getEncodingScheme(),
                                                 table.getImmutableStorageScheme());
-                             } else {
-                                 keyValueColumnExpression = new KeyValueColumnExpression(col);
-                             }
-                             indexKVs.add(keyValueColumnExpression);
-                             copyOfChildren.set(0, keyValueColumnExpression);
-                             Integer count = arrayExpressionCounts.get(expression);
-                             arrayExpressionCounts.put(expression, count != null ? (count - 1) : -1);
-                         }
-                         return null;
-                     }
-                 });
+                            } else {
+                                keyValueColumnExpression = new KeyValueColumnExpression(col);
+                            }
+                            indexKVs.add(keyValueColumnExpression);
+                            copyOfChildren.set(0, keyValueColumnExpression);
+                            Integer count = serverParsedExpressionCounts.get(expression);
+                            serverParsedExpressionCounts.put(expression,
+                                    count != null ? (count - 1) : -1);
+                        }
+                        return null;
+                    }
+                });
 
-                 Expression func = super.visitLeave(node,children);
-                 // Add the keyvalues which is of type array
-                 if (!indexKVs.isEmpty()) {
-                    arrayKVRefs.addAll(indexKVs);
-                    arrayProjectedColumnRefs.addAll(indexProjectedColumns);
+                Expression func = super.visitLeave(node, children);
+                // Add the keyvalues which is of type array or json
+                if (!indexKVs.isEmpty()) {
+                    serverParsedKVRefs.addAll(indexKVs);
+                    serverParsedProjectedColumnRefs.addAll(indexProjectedColumns);
                     Expression funcModified = super.visitLeave(node, copyOfChildren);
-                    // Track the array index function also
-                    arrayKVFuncs.add(funcModified);
-                    arrayOldFuncs.add(func);
+                    // Track the array index or json function also
+                    serverParsedKVFuncs.add(funcModified);
+                    serverParsedOldFuncs.add(func);
                 }
                 return func;
             } else {
-                return super.visitLeave(node,children);
+                return super.visitLeave(node, children);
             }
         }
     }
+
+    private static boolean isJsonFunction(FunctionParseNode node) {
+        return JsonValueFunction.NAME.equals(node.getName()) || JsonQueryFunction.NAME.equals(
+                node.getName());
+    }
 }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index 9446d37..f74bee1 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -90,6 +90,8 @@
     public static final String INDEX_FILTER = "_IndexFilter";
     public static final String INDEX_LIMIT = "_IndexLimit";
     public static final String INDEX_FILTER_STR = "_IndexFilterStr";
+    public static final String JSON_VALUE_FUNCTION = "_JsonValueFunction";
+    public static final String JSON_QUERY_FUNCTION = "_JsonQueryFunction";
 
     /*
      * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5e76e47..4eddb4d 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -201,7 +201,12 @@
     CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
         "Cannot use SCN to look further back in the past beyond the configured max lookback age"),
 
-     /**
+    COMPARISON_UNSUPPORTED(539, "42915", "Comparison not supported for the datatype."),
+    INVALID_JSON_DATA(540, "42916", "Invalid json data."),
+    JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION(541, "42917",
+            "Functions returning JSON fragments are not allowed in Index Expression."),
+
+    /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.
      *
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index f80a4c2..3ee1459 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -192,7 +192,10 @@
     CosFunction(CosFunction.class),
     TanFunction(TanFunction.class),
     RowKeyBytesStringFunction(RowKeyBytesStringFunction.class),
-    PhoenixRowTimestampFunction(PhoenixRowTimestampFunction.class)
+    PhoenixRowTimestampFunction(PhoenixRowTimestampFunction.class),
+    JsonValueFunction(JsonValueFunction.class),
+    JsonQueryFunction(JsonQueryFunction.class),
+    JsonExistsFunction(JsonExistsFunction.class)
     ;
 
     ExpressionType(Class<? extends Expression> clazz) {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
index e27973a..5f73fb0 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DistinctCountAggregateFunction.java
@@ -28,6 +28,7 @@
 import org.apache.phoenix.parse.DistinctCountParseNode;
 import org.apache.phoenix.parse.FunctionParseNode.Argument;
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.ComparisonNotSupportedException;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
@@ -100,6 +101,11 @@
     
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        for (Expression child : getChildren()) {
+            if (child.getDataType() != null && !child.getDataType().isComparisonSupported()) {
+                throw new ComparisonNotSupportedException(child.getDataType());
+            }
+        }
         // TODO: optimize query plan of this to run scan serially for a limit of one row
         if (!super.evaluate(tuple, ptr)) {
             ptr.set(ZERO); // If evaluate returns false, then no rows were found, so result is 0
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonExistsFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonExistsFunction.java
new file mode 100644
index 0000000..5b82361
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonExistsFunction.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.parse.JsonExistsParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.json.JsonDataFormat;
+import org.apache.phoenix.util.json.JsonDataFormatFactory;
+
+import java.util.List;
+
+/**
+ * Built-in function for JSON_EXISTS JSON_EXISTS(<column_with_json/json_string>, <path>) JSON_EXISTS
+ * determines whether a JSON value satisfies a search criterion.
+ */
+@FunctionParseNode.BuiltInFunction(name = JsonExistsFunction.NAME,
+        nodeClass = JsonExistsParseNode.class,
+        args = { @FunctionParseNode.Argument(allowedTypes = { PJson.class, PVarbinary.class }),
+                @FunctionParseNode.Argument(allowedTypes = { PVarchar.class }) })
+public class JsonExistsFunction extends ScalarFunction {
+
+    public static final String NAME = "JSON_EXISTS";
+    private final JsonDataFormat
+            jsonDataFormat =
+            JsonDataFormatFactory.getJsonDataFormat(JsonDataFormatFactory.DataFormat.BSON);
+
+    // This is called from ExpressionType newInstance
+    public JsonExistsFunction() {
+
+    }
+
+    public JsonExistsFunction(List<Expression> children) {
+        super(children);
+        Preconditions.checkNotNull(getJSONPathExpr());
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (!getColValExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+        if (ptr == null || ptr.getLength() == 0) {
+            return false;
+        }
+
+        // Column name or JSON string
+        Object top = PJson.INSTANCE.toObject(ptr, getColValExpr().getSortOrder());
+
+        if (!getJSONPathExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+
+        if (ptr.getLength() == 0) {
+            return false;
+        }
+
+        String
+                jsonPathExprStr =
+                (String) PVarchar.INSTANCE.toObject(ptr, getJSONPathExpr().getSortOrder());
+        if (jsonPathExprStr == null) {
+            return false;
+        }
+
+        boolean isPathValid = jsonDataFormat.isPathValid(top, jsonPathExprStr);
+        ptr.set(PBoolean.INSTANCE.toBytes(isPathValid));
+        return true;
+    }
+
+    private Expression getColValExpr() {
+        return getChildren().get(0);
+    }
+
+    private Expression getJSONPathExpr() {
+        return getChildren().get(1);
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PBoolean.INSTANCE;
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonQueryFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonQueryFunction.java
new file mode 100644
index 0000000..a2a3d00
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonQueryFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.parse.JsonQueryParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.json.JsonDataFormat;
+import org.apache.phoenix.util.json.JsonDataFormatFactory;
+
+import java.sql.Types;
+import java.util.List;
+
+/**
+ * Built-in function for JSON_QUERY JSON_QUERY(<column_with_json/json_string>, <path> [returning
+ * <type>]) Extracts an object or an array from a JSON string.
+ */
+@FunctionParseNode.BuiltInFunction(name = JsonQueryFunction.NAME,
+        nodeClass = JsonQueryParseNode.class,
+        args = { @FunctionParseNode.Argument(allowedTypes = { PJson.class, PVarbinary.class }),
+                @FunctionParseNode.Argument(allowedTypes = { PVarchar.class }) })
+public class JsonQueryFunction extends ScalarFunction {
+
+    public static final String NAME = "JSON_QUERY";
+    private final JsonDataFormat
+            jsonDataFormat =
+            JsonDataFormatFactory.getJsonDataFormat(JsonDataFormatFactory.DataFormat.BSON);
+
+    // This is called from ExpressionType newInstance
+    public JsonQueryFunction() {
+
+    }
+
+    public JsonQueryFunction(List<Expression> children) {
+        super(children);
+        Preconditions.checkNotNull(getJSONPathExpr());
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (!getColValExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+        if (ptr == null || ptr.getLength() == 0) {
+            return false;
+        }
+
+        // Column name or JSON string
+        Object top = PJson.INSTANCE.toObject(ptr, getColValExpr().getSortOrder());
+
+        if (!getJSONPathExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+
+        if (ptr.getLength() == 0) {
+            return false;
+        }
+
+        String
+                jsonPathExprStr =
+                (String) PVarchar.INSTANCE.toObject(ptr, getJSONPathExpr().getSortOrder());
+        if (jsonPathExprStr == null) {
+            return false;
+        }
+        Object value = jsonDataFormat.getValue(top, jsonPathExprStr);
+        int valueType = jsonDataFormat.getValueType(top, jsonPathExprStr);
+        if (value != null) {
+            switch (valueType) {
+            case Types.ARRAY:
+            case Types.NVARCHAR:
+                ptr.set(PVarchar.INSTANCE.toBytes(value));
+                break;
+            default:
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private Expression getColValExpr() {
+        return getChildren().get(0);
+    }
+
+    private Expression getJSONPathExpr() {
+        return getChildren().get(1);
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PVarchar.INSTANCE;
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonValueFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonValueFunction.java
new file mode 100644
index 0000000..a42b0cf
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/JsonValueFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.parse.JsonValueParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.json.JsonDataFormat;
+import org.apache.phoenix.util.json.JsonDataFormatFactory;
+
+import java.sql.Types;
+import java.util.List;
+
+/**
+ * Built-in function for JSON_VALUE JSON_VALUE(<column_with_json/json_string>, <path> [returning
+ * <type>]) Extracts a scalar JSON value—everything except object and array—and returns it as a
+ * native type. The optional returning clause performs a typecast. Without a returning clause,
+ * JSON_VALUE returns a string.
+ */
+@FunctionParseNode.BuiltInFunction(name = JsonValueFunction.NAME,
+        nodeClass = JsonValueParseNode.class,
+        args = { @FunctionParseNode.Argument(allowedTypes = { PJson.class, PVarbinary.class }),
+                @FunctionParseNode.Argument(allowedTypes = { PVarchar.class }) })
+public class JsonValueFunction extends ScalarFunction {
+
+    public static final String NAME = "JSON_VALUE";
+    private final JsonDataFormat
+            jsonDataFormat =
+            JsonDataFormatFactory.getJsonDataFormat(JsonDataFormatFactory.DataFormat.BSON);
+
+    // This is called from ExpressionType newInstance
+    public JsonValueFunction() {
+
+    }
+
+    public JsonValueFunction(List<Expression> children) {
+        super(children);
+        Preconditions.checkNotNull(getJSONPathExpr());
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (!getColValExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+        if (ptr == null || ptr.getLength() == 0) {
+            return false;
+        }
+
+        // Column name or JSON string
+        Object top = PJson.INSTANCE.toObject(ptr, getColValExpr().getSortOrder());
+
+        if (!getJSONPathExpr().evaluate(tuple, ptr)) {
+            return false;
+        }
+
+        if (ptr.getLength() == 0) {
+            return false;
+        }
+
+        String
+                jsonPathExprStr =
+                (String) PVarchar.INSTANCE.toObject(ptr, getJSONPathExpr().getSortOrder());
+        if (jsonPathExprStr == null) {
+            return false;
+        }
+
+        Object value = jsonDataFormat.getValue(top, jsonPathExprStr);
+        int valueType = jsonDataFormat.getValueType(top, jsonPathExprStr);
+        if (value != null) {
+            switch (valueType) {
+            case Types.INTEGER:
+            case Types.BOOLEAN:
+            case Types.DOUBLE:
+            case Types.VARCHAR:
+            case Types.BIGINT:
+            case Types.BINARY:
+            case Types.DATE:
+                ptr.set(PVarchar.INSTANCE.toBytes(String.valueOf(value)));
+                break;
+            default:
+                return false;
+            }
+        } else {
+            ptr.set(PVarchar.INSTANCE.toBytes(null));
+        }
+
+        return true;
+    }
+
+    private Expression getColValExpr() {
+        return getChildren().get(0);
+    }
+
+    private Expression getJSONPathExpr() {
+        return getChildren().get(1);
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PVarchar.INSTANCE;
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 1d5b158..fb71555 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -311,6 +311,10 @@
                 explainPlanAttributesBuilder.setServerArrayElementProjection(true);
             }
         }
+        if (scan.getAttribute(BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION) != null
+                || scan.getAttribute(BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION) != null) {
+            planSteps.add("    SERVER JSON FUNCTION PROJECTION");
+        }
     }
 
     /**
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index c4c1940..83fec11 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -137,6 +137,7 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixResultSet.class);
 
     private final static String STRING_FALSE = "0";
+    private final static String LITERAL_STRING_FALSE = "false";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
     private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
@@ -370,7 +371,7 @@
         if (type == PBoolean.INSTANCE) {
           return Boolean.TRUE.equals(value);
         } else if (type == PVarchar.INSTANCE) {
-          return !STRING_FALSE.equals(value);
+          return !STRING_FALSE.equals(value) && !LITERAL_STRING_FALSE.equals(value);
         } else if (type == PInteger.INSTANCE) {
           return !INTEGER_FALSE.equals(value);
         } else if (type == PDecimal.INSTANCE) {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnDef.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnDef.java
index ac48359..062040d 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnDef.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnDef.java
@@ -19,6 +19,7 @@
 
 import java.sql.SQLException;
 
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
@@ -145,6 +146,13 @@
                     }
                 }
             }
+            if (dataType != null && !dataType.canBePrimaryKey() && isPK) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_PRIMARY_KEY_CONSTRAINT)
+                        .setColumnName(columnDefName.getColumnName())
+                        .setMessage(
+                                "," + dataType.toString() + " is not supported as primary key,")
+                        .build().buildException();
+            }
             this.maxLength = maxLength;
             this.scale = scale;
             this.isPK = isPK;
@@ -222,7 +230,14 @@
     public void setIsPK(boolean isPK) {
         this.isPK = isPK;
     }
-    
+
+    public String toFullString() {
+        if (!Strings.isNullOrEmpty(columnDefName.getFamilyName())) {
+            return columnDefName.getFamilyName() + "." + toString();
+        }
+        return toString();
+    }
+
     @Override
     public String toString() {
         StringBuilder buf = new StringBuilder(columnDefName.getColumnNode().toString());
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnName.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnName.java
index 82439ec..5d62017 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnName.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ColumnName.java
@@ -39,7 +39,11 @@
     public static ColumnName newColumnName(NamedNode familyName, NamedNode columnName) {
         return new ColumnName(familyName, columnName);
     }
-    
+
+    public static ColumnName newColumnName(String familyName, String columnName) {
+        return new ColumnName(familyName, columnName);
+    }
+
     private ColumnName(NamedNode familyNode, NamedNode columnNode) {
         this.familyNode = familyNode;
         this.columnNode = columnNode;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ComparisonParseNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ComparisonParseNode.java
index e6a6922..ba2b20a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ComparisonParseNode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ComparisonParseNode.java
@@ -34,7 +34,7 @@
  */
 public abstract class ComparisonParseNode extends BinaryParseNode {
 
-    ComparisonParseNode(ParseNode lhs, ParseNode rhs) {
+    public ComparisonParseNode(ParseNode lhs, ParseNode rhs) {
         super(lhs, rhs);
     }
 
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonExistsParseNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonExistsParseNode.java
new file mode 100644
index 0000000..ee10f97
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonExistsParseNode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.JsonExistsFunction;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class JsonExistsParseNode extends FunctionParseNode {
+
+    public JsonExistsParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context)
+            throws SQLException {
+        PDataType dataType = children.get(0).getDataType();
+        if (!dataType.isCoercibleTo(PJson.INSTANCE)) {
+            throw new SQLException(dataType + " type is unsupported for JSON_EXISTS().");
+        }
+        return new JsonExistsFunction(children);
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonQueryParseNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonQueryParseNode.java
new file mode 100644
index 0000000..81093fb
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonQueryParseNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.JsonQueryFunction;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * ParseNode for JSON_QUERY function.
+ */
+public class JsonQueryParseNode extends FunctionParseNode {
+
+    public JsonQueryParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context)
+            throws SQLException {
+        PDataType dataType = children.get(0).getDataType();
+        if (!dataType.isCoercibleTo(PJson.INSTANCE)) {
+            throw new SQLException(dataType + " type is unsupported for JSON_QUERY().");
+        }
+        return new JsonQueryFunction(children);
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonValueParseNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonValueParseNode.java
new file mode 100644
index 0000000..33f75fc
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/JsonValueParseNode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.JsonValueFunction;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class JsonValueParseNode extends FunctionParseNode {
+
+    public JsonValueParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context)
+            throws SQLException {
+        PDataType dataType = children.get(0).getDataType();
+        if (!dataType.isCoercibleTo(PJson.INSTANCE)) {
+            throw new SQLException(dataType + " type is unsupported for JSON_VALUE().");
+        }
+        return new JsonValueFunction(children);
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
index 6fcd451..a04c214 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
@@ -86,7 +86,7 @@
         if (!dynColumns.isEmpty()) {
             buf.append('(');
             for (ColumnDef def : dynColumns) {
-                buf.append(def);
+                buf.append(def.toFullString());
                 buf.append(',');
             }
             buf.setLength(buf.length()-1);
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 8f937a9..53e8263 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -38,6 +38,13 @@
  * @since 0.1
  */
 public class SelectStatement implements FilterableStatement {
+    public static final SelectStatement SELECT_STAR =
+            new SelectStatement(
+                    null, null, false,
+                    Arrays.asList(),
+                    null, Collections.<ParseNode>emptyList(),
+                    null, Collections.<OrderByNode>emptyList(),
+                    null, null, 0, false, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1));
     public static final SelectStatement SELECT_ONE =
             new SelectStatement(
                     null, null, false, 
@@ -65,7 +72,13 @@
                 select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(), 
                 select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
     }
-    
+
+    public static SelectStatement create(SelectStatement select, TableNode tableNode, List<AliasedNode> selects) {
+        return new SelectStatement(tableNode, select.getHint(), select.isDistinct(),
+                selects, select.getWhere(), select.getGroupBy(), select.getHaving(),
+                select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+    }
+
     public SelectStatement combine(ParseNode where) {
         if (where == null) {
             return this;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ComparisonNotSupportedException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ComparisonNotSupportedException.java
new file mode 100644
index 0000000..8449c77
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ComparisonNotSupportedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.schema.types.PDataType;
+
+public class ComparisonNotSupportedException extends RuntimeException {
+    public ComparisonNotSupportedException(PDataType<?> pDataType) {
+        super(new SQLExceptionInfo.Builder(SQLExceptionCode.COMPARISON_UNSUPPORTED)
+                .setMessage(" for type " + pDataType.toString()).build().buildException());
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0e78202..2761581 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1538,6 +1538,11 @@
                 if (expressionIndexCompiler.isAggregate()) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
+                if (expressionIndexCompiler.isJsonFragment()) {
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION).build()
+                            .buildException();
+                }
                 if (!(expression.getDeterminism() == Determinism.ALWAYS || expression.getDeterminism() == Determinism.PER_ROW)) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataType.java
index c3071ba..4b33891 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataType.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataType.java
@@ -44,6 +44,7 @@
 import org.apache.phoenix.thirdparty.com.google.common.math.LongMath;
 import org.apache.phoenix.thirdparty.com.google.common.primitives.Doubles;
 import org.apache.phoenix.thirdparty.com.google.common.primitives.Longs;
+import org.bson.RawBsonDocument;
 
 /**
  * The data types of PColumns
@@ -94,6 +95,21 @@
         return equalsAny(this, otherType, PVarbinary.INSTANCE, PBinary.INSTANCE);
     }
 
+    /**
+     * @return true if {@link PDataType} can be declared as primary key otherwise false.
+     */
+    public boolean canBePrimaryKey() {
+        return true;
+    }
+
+    /**
+     * @return true if {@link PDataType} supports equality operators (=,!=,<,>,<=,>=) otherwise
+     *         false.
+     */
+    public boolean isComparisonSupported() {
+        return true;
+    }
+
     public int estimateByteSize(Object o) {
         if (isFixedWidth()) { return getByteSize(); }
         if (isArrayType()) {
@@ -515,6 +531,7 @@
     public final static Integer DOUBLE_PRECISION = 15;
 
     public static final int ARRAY_TYPE_BASE = 3000;
+    public static final int JSON_TYPE = 5000;
     public static final String ARRAY_TYPE_SUFFIX = "ARRAY";
 
     protected static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@@ -1175,6 +1192,18 @@
         }
         for (PDataType type : PDataType.values()) {
             if (type.isArrayType()) {
+                if(type.getJavaClass().isInstance(value)){
+                    if (type.isArrayType()) {
+                        PhoenixArray arr = (PhoenixArray) value;
+                        if ((type.getSqlType() == arr.baseType.sqlType
+                                + PDataType.ARRAY_TYPE_BASE)) {
+                            return type;
+                        }
+                    } else {
+                        return type;
+                    }
+                }
+
                 if (value instanceof PhoenixArray) {
                     PhoenixArray arr = (PhoenixArray)value;
                     if ((type.getSqlType() == arr.baseType.sqlType + PDataType.ARRAY_TYPE_BASE)
@@ -1190,6 +1219,10 @@
                         }
                     } catch (SQLException e) { /* Passthrough to fail */ }
                 }
+            } else if (value instanceof RawBsonDocument) {
+                if (type == PJson.INSTANCE) {
+                    return type;
+                }
             } else {
                 if (type.getJavaClass().isInstance(value)) {
                     return type;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
index 3da8b2e..b7d30e3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PDataTypeFactory.java
@@ -123,6 +123,7 @@
     types.add(PVarbinaryArray.INSTANCE);
     types.add(PVarchar.INSTANCE);
     types.add(PVarcharArray.INSTANCE);
+    types.add(PJson.INSTANCE);
 
     classToInstance = new HashMap<>(types.size());
     for (PDataType t : types) {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
new file mode 100644
index 0000000..e437fb5
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PJson.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.types;
+
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.json.JsonDataFormat;
+import org.apache.phoenix.util.json.JsonDataFormatFactory;
+
+/**
+ * <p>
+ * A Phoenix data type to represent JSON. The json data type stores JSON in BSON format as used by
+ * mongodb. We use the mongodb libraries to store and retrieve the JSON object using the JSON
+ * functions.
+ * <p>
+ * JSON data types are for storing JSON (JavaScript Object Notation) data, as specified in RFC 7159.
+ * Such data can also be stored as text, but the JSON data types have the advantage of enforcing
+ * that each stored value is valid according to the JSON rules.
+ */
+public class PJson extends PVarbinary {
+
+    public static final PJson INSTANCE = new PJson();
+    private JsonDataFormat jsonDataFormat;
+
+    private PJson() {
+        super("JSON", PDataType.JSON_TYPE, byte[].class, null, 48);
+        jsonDataFormat = JsonDataFormatFactory.getJsonDataFormat(JsonDataFormatFactory.DataFormat.BSON);
+    }
+
+    @Override
+    public boolean canBePrimaryKey() {
+        return false;
+    }
+
+    @Override
+    public boolean isComparisonSupported() {
+        return false;
+    }
+
+    @Override
+    public int toBytes(Object object, byte[] bytes, int offset) {
+        if (object == null) {
+            return 0;
+        }
+        byte[] b = toBytes(object);
+        System.arraycopy(b, 0, bytes, offset, b.length);
+        return b.length;
+
+    }
+
+    @Override
+    public byte[] toBytes(Object object) {
+        if (object == null) {
+            return ByteUtil.EMPTY_BYTE_ARRAY;
+        }
+        return jsonDataFormat.toBytes(object);
+    }
+
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length,
+            @SuppressWarnings("rawtypes") PDataType actualType, SortOrder sortOrder,
+            Integer maxLength, Integer scale) {
+        if (length == 0) {
+            return null;
+        }
+        return jsonDataFormat.toObject(bytes, offset, length);
+    }
+
+    @Override
+    public Object toObject(Object object, @SuppressWarnings("rawtypes") PDataType actualType) {
+        if (object == null) {
+            return null;
+        }
+        if (equalsAny(actualType, PVarchar.INSTANCE)) {
+            return toObject((String) object);
+        }
+        return object;
+    }
+
+    @Override
+    public Object toObject(String value) {
+        if (value == null || value.length() == 0) {
+            return null;
+        }
+        return jsonDataFormat.toObject(value);
+    }
+
+    @Override
+    public boolean isCoercibleTo(@SuppressWarnings("rawtypes") PDataType targetType) {
+        return equalsAny(targetType, this, PBinary.INSTANCE, PVarbinary.INSTANCE);
+
+    }
+
+    @Override
+    public int estimateByteSize(Object o) {
+        return jsonDataFormat.estimateByteSize(o);
+    }
+
+    @Override
+    public Integer getByteSize() {
+        return null;
+    }
+
+    @Override
+    public boolean isBytesComparableWith(@SuppressWarnings("rawtypes") PDataType otherType) {
+        return otherType == PVarbinary.INSTANCE;
+    }
+
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        String json = "{a : 1}";
+        return this.toObject(json);
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index 5799313..3cba468 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -33,6 +33,10 @@
         super("VARBINARY", Types.VARBINARY, byte[].class, null, 22);
     }
 
+    PVarbinary(String sqlTypeName, int sqlType, Class clazz, PDataCodec codec, int ordinal) {
+        super(sqlTypeName, sqlType, clazz, codec, ordinal);
+    }
+
     @Override
     public byte[] toBytes(Object object) {
         if (object == null) {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarchar.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
index ce35710..6b431bb 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/types/PVarchar.java
@@ -89,6 +89,11 @@
         if (equalsAny(actualType, this, PChar.INSTANCE)) {
             String s = (String) object;
             return s == null || s.length() > 0 ? s : null;
+        } else if (equalsAny(actualType, PVarchar.INSTANCE)) {
+            if (object == null) {
+                return null;
+            }
+            return object.toString();
         }
         return throwConstraintViolationException(actualType, this);
     }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonDataFormat.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonDataFormat.java
new file mode 100644
index 0000000..46ac53f
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonDataFormat.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.PathNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bson.BsonBinaryReader;
+import org.bson.BsonDocument;
+import org.bson.BsonDocumentReader;
+import org.bson.BsonValue;
+import org.bson.RawBsonDocument;
+import org.bson.codecs.BsonDocumentCodec;
+import org.bson.codecs.DecoderContext;
+import org.bson.codecs.RawBsonDocumentCodec;
+import org.bson.io.ByteBufferBsonInput;
+
+import java.nio.ByteBuffer;
+import java.sql.Types;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BsonDataFormat implements JsonDataFormat {
+    @Override
+    public byte[] toBytes(Object object) {
+        return Bytes.toBytes(((RawBsonDocument) object).getByteBuffer().asNIO());
+    }
+
+    @Override
+    public Object toObject(String value) {
+        return RawBsonDocument.parse(value);
+    }
+
+    @Override
+    public Object toObject(byte[] bytes, int offset, int length) {
+        return new RawBsonDocument(bytes, offset, length);
+    }
+
+    @Override
+    public int estimateByteSize(Object o) {
+        RawBsonDocument rawBSON = (RawBsonDocument) o;
+        return rawBSON.size();
+    }
+
+    @Override
+    public int getValueType(Object obj, String jsonPathExprStr) {
+        BsonValue value = getBsonValue(jsonPathExprStr, (RawBsonDocument) obj);
+        return getSqlType(value);
+    }
+
+    @Override
+    public Object getValue(Object obj, String jsonPathExprStr) {
+        BsonValue value = getBsonValue(jsonPathExprStr, (RawBsonDocument) obj);
+        return getValue(value);
+    }
+
+    private Object getValue(BsonValue value) {
+        if (value != null) {
+            switch (value.getBsonType()) {
+            case INT32:
+                return value.asInt32().getValue();
+            case INT64:
+                return value.asInt64().getValue();
+            case STRING:
+            case SYMBOL:
+                return value.asString().getValue();
+            case DECIMAL128:
+                return value.asDecimal128().doubleValue();
+            case DOUBLE:
+                return value.asDouble().getValue();
+            case BOOLEAN:
+                return value.asBoolean().getValue();
+            case BINARY:
+                return value.asBinary().getData();
+            case DATE_TIME:
+                return value.asDateTime().getValue();
+            case DOCUMENT:
+                return value.asDocument().toJson();
+            case ARRAY:
+                return readArray(value).toString();
+            default:
+                return null;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public ByteBuffer updateValue(Object top, String jsonPathExprStr, String newVal) {
+        Configuration conf = Configuration.builder().jsonProvider(new BsonJsonProvider()).build();
+        BsonValue newValue = JsonPath.using(conf).parse(newVal).json();
+        BsonDocument root = fromRaw((RawBsonDocument) top);
+        JsonPath.using(conf).parse(root).set(jsonPathExprStr, newValue);
+        RawBsonDocument
+                updated =
+                new RawBsonDocumentCodec().decode(new BsonDocumentReader(root),
+                        DecoderContext.builder().build());
+        return updated.getByteBuffer().asNIO();
+    }
+
+    // Ref: https://github.com/json-path/JsonPath/pull/828
+    @Override
+    public boolean isPathValid(Object top, String path) {
+        try{
+            Configuration conf = Configuration.builder().jsonProvider(new BsonJsonProvider()).build();
+            BsonDocument root = fromRaw((RawBsonDocument) top);
+            JsonPath.using(conf).parse(root).read(path);
+            return true;
+        }
+        catch (PathNotFoundException e){
+            return false;
+        }
+    }
+
+    private BsonValue getBsonValue(String jsonPathExprStr, RawBsonDocument top) {
+        Configuration conf = getConfiguration();
+        BsonValue value = JsonPath.using(conf).parse(top).read(jsonPathExprStr, BsonValue.class);
+        return value;
+    }
+
+    private List<Object> readArray(BsonValue value) {
+        return value.asArray().stream().map(e -> {
+            // The reason for handling string in a special way is because:
+            // Given a string array in JSON - ["hello","world"]
+            // A string array when converted to a string returns
+            // as [hello, world] - the quotes stripped
+            // This change allows to retain those quotes.
+            if (e.isString() || e.isSymbol()) {
+                return "\"" + getValue(e) + "\"";
+            } else {
+                return String.valueOf(getValue(e));
+            }
+        }).collect(Collectors.toList());
+    }
+
+    private Configuration getConfiguration() {
+        Configuration conf = Configuration.builder().jsonProvider(new BsonJsonProvider()).build();
+        // This options will make us work in lax mode.
+        conf = conf.addOptions(Option.SUPPRESS_EXCEPTIONS);
+        return conf;
+    }
+
+    // Transform to an in memory BsonDocument instance
+    private BsonDocument fromRaw(RawBsonDocument rawDocument) {
+        // Transform to an in memory BsonDocument instance
+        BsonBinaryReader
+                bsonReader =
+                new BsonBinaryReader(new ByteBufferBsonInput(rawDocument.getByteBuffer()));
+        try {
+            return new BsonDocumentCodec().decode(bsonReader, DecoderContext.builder().build());
+        } finally {
+            bsonReader.close();
+        }
+    }
+
+    private int getSqlType(BsonValue value) {
+        if (value == null) {
+            return Types.NULL;
+        }
+        switch (value.getBsonType()) {
+        case INT32:
+            return Types.INTEGER;
+        case INT64:
+            return Types.BIGINT;
+        case DECIMAL128:
+        case DOUBLE:
+            return Types.DOUBLE;
+        case STRING:
+        case SYMBOL:
+            return Types.VARCHAR;
+        case BOOLEAN:
+            return Types.BOOLEAN;
+        case BINARY:
+            return Types.BINARY;
+        case DATE_TIME:
+            return Types.DATE;
+        case ARRAY:
+            return Types.ARRAY;
+        case DOCUMENT:
+            return Types.NVARCHAR;
+        default:
+            return Types.OTHER;
+        }
+    }
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonJsonProvider.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonJsonProvider.java
new file mode 100644
index 0000000..f9bedf4
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/BsonJsonProvider.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import com.jayway.jsonpath.InvalidJsonException;
+import com.jayway.jsonpath.spi.json.AbstractJsonProvider;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
+import org.bson.BsonString;
+import org.bson.BsonType;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.json.JsonReader;
+import org.bson.types.ObjectId;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class BsonJsonProvider extends AbstractJsonProvider {
+
+    @Override
+    public Object parse(final String json) throws InvalidJsonException {
+        JsonReader jsonReader = new JsonReader(json);
+        BsonType bsonType = jsonReader.readBsonType();
+        switch (bsonType) {
+        case ARRAY:
+            return BsonArray.parse(json);
+        case DOCUMENT:
+            return BsonDocument.parse(json);
+        case STRING:
+            return new BsonString(jsonReader.readString());
+        case INT32:
+            return new BsonInt32(jsonReader.readInt32());
+        default:
+            throw new InvalidJsonException(String.format("Unsupported bson type %s", bsonType));
+        }
+    }
+
+    @Override
+    public Object parse(InputStream jsonStream, String charset) throws InvalidJsonException {
+        return null;
+    }
+
+    @Override
+    public String toJson(Object obj) {
+        return null;
+    }
+
+    @Override
+    public Object createArray() {
+        return new BsonArray();
+    }
+
+    @Override
+    public boolean isArray(final Object obj) {
+
+        return (obj instanceof BsonArray || obj instanceof List);
+    }
+
+    @Override
+    public Object getArrayIndex(final Object obj, final int idx) {
+
+        return toBsonArray(obj).get(idx);
+    }
+
+    @Override
+    public void setArrayIndex(final Object array, final int index, final Object newValue) {
+        if (!isArray(array)) {
+            throw new UnsupportedOperationException();
+        } else {
+            BsonArray arr = toBsonArray(array);
+            if (index == arr.size()) {
+                arr.add(toBsonValue(newValue));
+            } else {
+                arr.set(index, toBsonValue(newValue));
+            }
+        }
+    }
+
+    @Override
+    public Object createMap() {
+        return new BsonDocument();
+    }
+
+    @Override
+    public boolean isMap(final Object obj) {
+        return (obj instanceof BsonDocument);
+    }
+
+    @Override
+    public Object getMapValue(final Object obj, final String key) {
+        BsonDocument bsonDocument = toBsonDocument(obj);
+        Object o = bsonDocument.get(key);
+        if (!bsonDocument.containsKey(key)) {
+            return UNDEFINED;
+        } else {
+            return unwrap(o);
+        }
+    }
+
+    @Override
+    public Iterable<?> toIterable(final Object obj) {
+        BsonArray arr = toBsonArray(obj);
+        List<Object> values = new ArrayList<Object>(arr.size());
+        for (Object o : arr) {
+            values.add(toJavaType(toBsonValue(o)));
+        }
+        return values;
+    }
+
+    @Override
+    public void setProperty(final Object obj, final Object key, final Object value) {
+        if (isMap(obj)) {
+            toBsonDocument(obj).put(key.toString(), toBsonValue(value));
+        } else {
+            BsonArray array = toBsonArray(obj);
+            int index;
+            if (key != null) {
+                index = key instanceof Integer ? (Integer) key : Integer.parseInt(key.toString());
+            } else {
+                index = array.size();
+            }
+
+            if (index == array.size()) {
+                array.add(toBsonValue(value));
+            } else {
+                array.set(index, toBsonValue(value));
+            }
+        }
+    }
+
+    private static BsonArray toBsonArray(final Object o) {
+        return (BsonArray) o;
+    }
+
+    private static BsonDocument toBsonDocument(final Object o) {
+        return (BsonDocument) o;
+    }
+
+    /**
+     * Refer to this link for background on the implementation :
+     * https://github.com/spring-projects/spring-data-mongodb/blob/main/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/BsonUtils.java#L66
+     * @param source
+     * @return
+     */
+    private static BsonValue toBsonValue(Object source) {
+
+        if (source instanceof BsonValue) {
+            return (BsonValue) source;
+        }
+
+        if (source instanceof String) {
+            return new BsonString((String) source);
+        }
+
+        if (source instanceof ObjectId) {
+            return new BsonObjectId((ObjectId) source);
+        }
+
+        if (source instanceof Double) {
+            return new BsonDouble((Double) source);
+        }
+
+        if (source instanceof Integer) {
+            return new BsonInt32((Integer) source);
+        }
+
+        if (source instanceof Long) {
+            return new BsonInt64((Long) source);
+        }
+
+        if (source instanceof byte[]) {
+            return new BsonBinary((byte[]) source);
+        }
+
+        if (source instanceof Boolean) {
+            return new BsonBoolean((Boolean) source);
+        }
+
+        if (source instanceof Float) {
+            return new BsonDouble((Float) source);
+        }
+
+        throw new IllegalArgumentException(String.format("Unable to convert %s (%s) to BsonValue.", source,
+            source != null ? source.getClass().getName() : "null"));
+    }
+
+    /**
+     * Extract the corresponding plain value from {@link BsonValue}. Eg. plain {@link String} from
+     * {@link org.bson.BsonString}.
+     *
+     * @param value must not be {@literal null}.
+     * @return
+     * @since 2.1
+     */
+    public static Object toJavaType(BsonValue value) {
+
+        switch (value.getBsonType()) {
+            case INT32:
+                return value.asInt32().getValue();
+            case INT64:
+                return value.asInt64().getValue();
+            case STRING:
+                return value.asString().getValue();
+            case DECIMAL128:
+                return value.asDecimal128().doubleValue();
+            case DOUBLE:
+                return value.asDouble().getValue();
+            case BOOLEAN:
+                return value.asBoolean().getValue();
+            case OBJECT_ID:
+                return value.asObjectId().getValue();
+            case BINARY:
+                return value.asBinary().getData();
+            case DATE_TIME:
+                return new Date(value.asDateTime().getValue());
+            case SYMBOL:
+                return value.asSymbol().getSymbol();
+            case ARRAY:
+                return value.asArray().toArray();
+            case DOCUMENT:
+                return Document.parse(value.asDocument().toJson());
+            default:
+                return value;
+        }
+    }
+
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormat.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormat.java
new file mode 100644
index 0000000..0d11891
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormat.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import java.nio.ByteBuffer;
+
+public interface JsonDataFormat {
+    /**
+     * Return the byte[] of the Json Object of the underlying format.
+     * @param object
+     * @return
+     */
+    byte[] toBytes(Object object);
+
+    /**
+     * Return the Object corresponding to the Data format in which JSON is stored
+     * @param value
+     * @return
+     */
+    Object toObject(String value);
+
+    /**
+     * Return the Object corresponding to the Data format in which JSON is stored
+     * @param bytes
+     * @param offset
+     * @param length
+     * @return
+     */
+    Object toObject(byte[] bytes, int offset, int length);
+
+    /**
+     * Get the estimated size of the object - Json
+     * @param o
+     * @return
+     */
+    int estimateByteSize(Object o);
+
+    /**
+     * Get the type of the value in the Json in the specified path. The type confirms to a
+     * java.sql.Types
+     * @param obj
+     * @param jsonPathExprStr
+     * @return
+     */
+    int getValueType(Object obj, String jsonPathExprStr);
+
+    /**
+     * Get the value from Json in the specified path
+     * @param obj
+     * @param jsonPathExprStr
+     * @return
+     */
+    Object getValue(Object obj, String jsonPathExprStr);
+
+    /**
+     * Update the value in the Json path and return the ByteBuffer
+     * @param top
+     * @param jsonPathExprStr
+     * @param newVal
+     * @return
+     */
+    ByteBuffer updateValue(Object top, String jsonPathExprStr, String newVal);
+
+    /**
+     * Checks if the path is valid in a JSON document.
+     * @param top
+     * @param path
+     * @return
+     */
+    boolean isPathValid(Object top, String path);
+}
\ No newline at end of file
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormatFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormatFactory.java
new file mode 100644
index 0000000..7795733
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/json/JsonDataFormatFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+public class JsonDataFormatFactory {
+    public enum DataFormat {
+        BSON,
+        STRING
+    }
+    public static JsonDataFormat getJsonDataFormat(DataFormat type) {
+        if(type == DataFormat.BSON)
+            return  new BsonDataFormat();
+        else return null;
+    }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 72f92e8..fa45062 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -139,7 +139,7 @@
                     region.getRegionInfo().getTable().getNameAsString());
             throw new DoNotRetryIOException(cause.getMessage(), cause);
         }
-        if(isLocalIndex) {
+        if (isLocalIndex) {
             ScanUtil.setupLocalIndexScan(scan);
         }
     }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 12873c9..52cfad9 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -55,6 +56,9 @@
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
+import org.apache.phoenix.expression.function.JsonQueryFunction;
+import org.apache.phoenix.expression.function.JsonValueFunction;
+import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -118,13 +122,16 @@
         PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
         boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
 
-        Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
+        Set<KeyValueColumnExpression> serverParsedKVRefs = Sets.newHashSet();
         KeyValueSchema kvSchema = null;
         ValueBitSet kvSchemaBitSet = null;
-        Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
-        if (arrayFuncRefs != null) {
-            KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
-            for (Expression expression : arrayFuncRefs) {
+        List<Expression> resultList = getServerParsedExpressions(scan, serverParsedKVRefs);
+        Expression[] serverParsedFuncRefs = resultList.toArray(new Expression[0]);
+        if (serverParsedFuncRefs != null && serverParsedFuncRefs.length > 0) {
+            KeyValueSchema.KeyValueSchemaBuilder
+                    builder =
+                    new KeyValueSchema.KeyValueSchemaBuilder(0);
+            for (Expression expression : serverParsedFuncRefs) {
                 builder.addField(expression);
             }
             kvSchema = builder.build();
@@ -159,15 +166,18 @@
                 env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
             dataRegion = env.getRegion();
         }
-        innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
-                tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
-                ptr, useQualifierAsIndex);
+        innerScanner =
+                getWrappedScanner(env, innerScanner, serverParsedKVRefs, serverParsedFuncRefs,
+                        offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
+                        viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr,
+                        useQualifierAsIndex);
 
         final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {
-            innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
-                    p, j, tenantId, useQualifierAsIndex,
-                    useNewValueColumnQualifier);
+            innerScanner =
+                    new HashJoinRegionScanner(env, innerScanner, scan, serverParsedKVRefs,
+                            serverParsedFuncRefs, p, j, tenantId, useQualifierAsIndex,
+                            useNewValueColumnQualifier);
         }
         if (scanOffset != null) {
             final boolean isIncompatibleClient =
@@ -203,6 +213,39 @@
         return getTopNScanner(env, innerScanner, iterator, tenantId);
     }
 
+    private List<Expression> getServerParsedExpressions(Scan scan,
+            Set<KeyValueColumnExpression> serverParsedKVRefs) {
+        Expression[] serverParsedArrayFuncRefs = null;
+        if (scan.getAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX) != null) {
+            serverParsedArrayFuncRefs =
+                    deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+                            BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX, serverParsedKVRefs);
+        }
+        List<Expression> resultList = new ArrayList<>();
+        if (serverParsedArrayFuncRefs != null) {
+            Collections.addAll(resultList, serverParsedArrayFuncRefs);
+        }
+        Expression[] serverParsedJsonValueFuncRefs = null;
+        if (scan.getAttribute(BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION) != null) {
+            serverParsedJsonValueFuncRefs =
+                    deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+                            BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION, serverParsedKVRefs);
+        }
+        if (serverParsedJsonValueFuncRefs != null) {
+            Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
+        }
+        Expression[] serverParsedJsonQueryFuncRefs = null;
+        if (scan.getAttribute(BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION) != null) {
+            serverParsedJsonQueryFuncRefs =
+                    deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+                            BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION, serverParsedKVRefs);
+        }
+        if (serverParsedJsonQueryFuncRefs != null) {
+            Collections.addAll(resultList, serverParsedJsonQueryFuncRefs);
+        }
+        return resultList;
+    }
+
     @VisibleForTesting
     static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s,
                                                      boolean spoolingEnabled, long thresholdBytes) {
@@ -255,31 +298,40 @@
         }
     }
 
-    private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
-                                                                          Set<KeyValueColumnExpression> arrayKVRefs) {
-        byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX);
+    private Expression[] deserializeServerParsedPositionalExpressionInfoFromScan(Scan scan,
+            String scanAttribute, Set<KeyValueColumnExpression> serverParsedKVRefs) {
+        byte[] specificArrayIdx = scan.getAttribute(scanAttribute);
         if (specificArrayIdx == null) {
             return null;
         }
         ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
         try {
             DataInputStream input = new DataInputStream(stream);
-            int arrayKVRefSize = WritableUtils.readVInt(input);
-            for (int i = 0; i < arrayKVRefSize; i++) {
+            int kvRefSize = WritableUtils.readVInt(input);
+            for (int i = 0; i < kvRefSize; i++) {
                 PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
                 KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
                         : new KeyValueColumnExpression();
                 kvExp.readFields(input);
-                arrayKVRefs.add(kvExp);
+                serverParsedKVRefs.add(kvExp);
             }
-            int arrayKVFuncSize = WritableUtils.readVInt(input);
-            Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
-            for (int i = 0; i < arrayKVFuncSize; i++) {
-                ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
-                arrayIdxFunc.readFields(input);
-                arrayFuncRefs[i] = arrayIdxFunc;
+            int kvFuncSize = WritableUtils.readVInt(input);
+            Expression[] funcRefs = new Expression[kvFuncSize];
+            for (int i = 0; i < kvFuncSize; i++) {
+                ScalarFunction func = null;
+                if (scanAttribute.equals(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX)) {
+                    func = new ArrayIndexFunction();
+                } else if (scanAttribute.equals(BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION))  {
+                    func = new JsonValueFunction();
+                } else if (scanAttribute.equals(BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION))  {
+                    func = new JsonQueryFunction();
+                }
+                if (func != null) {
+                    func.readFields(input);
+                    funcRefs[i] = func;
+                }
             }
-            return arrayFuncRefs;
+            return funcRefs;
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 112bb5f..5a65c5a 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -107,8 +107,8 @@
    * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
    * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
    * the same from a custom filter.
-   * @param arrayKVRefs
-   * @param arrayFuncRefs
+   * @param serverParsedKVRefs
+   * @param serverParsedFuncRefs
    * @param offset starting position in the rowkey.
    * @param scan
    * @param tupleProjector
@@ -118,8 +118,9 @@
    * @param viewConstants
    */
   public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
-      final RegionScanner regionScanner, final Set<KeyValueColumnExpression> arrayKVRefs,
-      final Expression[] arrayFuncRefs, final int offset, final Scan scan,
+          final RegionScanner regionScanner, final Set<KeyValueColumnExpression> serverParsedKVRefs,
+          final Expression[] serverParsedFuncRefs,
+          final int offset, final Scan scan,
       final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
       final Region dataRegion, final IndexMaintainer indexMaintainer,
       PhoenixTransactionContext tx,
@@ -284,10 +285,11 @@
                 }
             }
           }
-          Cell arrayElementCell = null;
-          if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
-            int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
-            arrayElementCell = result.get(arrayElementCellPosition);
+          Cell serverParsedResultCell = null;
+          if (serverParsedFuncRefs != null && serverParsedFuncRefs.length > 0 && serverParsedKVRefs.size() > 0) {
+            int resultPosition = replaceServerParsedExpressionElement(serverParsedKVRefs,
+                    serverParsedFuncRefs, result);
+            serverParsedResultCell = result.get(resultPosition);
           }
           if (projector != null) {
             Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
@@ -301,8 +303,8 @@
             result.clear();
             result.add(tupleWithDynColsIfReqd.mergeWithDynColsListBytesAndGetValue(0,
                     serializedDynColsList));
-            if (arrayElementCell != null) {
-              result.add(arrayElementCell);
+            if (serverParsedResultCell != null) {
+              result.add(serverParsedResultCell);
             }
           }
           if (extraLimit >= 0 && --extraLimit == 0) {
@@ -413,38 +415,40 @@
         return next;
       }
 
-      private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
-          final Expression[] arrayFuncRefs, List<Cell> result) {
+      private int replaceServerParsedExpressionElement(
+              final Set<KeyValueColumnExpression> serverParsedKVRefs,
+              final Expression[] serverParsedFuncRefs, List<Cell> result) {
         // make a copy of the results array here, as we're modifying it below
         MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
         // The size of both the arrays would be same?
         // Using KeyValueSchema to set and retrieve the value
         // collect the first kv to get the row
         Cell rowKv = result.get(0);
-        for (KeyValueColumnExpression kvExp : arrayKVRefs) {
+        for (KeyValueColumnExpression kvExp : serverParsedKVRefs) {
           if (kvExp.evaluate(tuple, ptr)) {
             ListIterator<Cell> itr = result.listIterator();
             while (itr.hasNext()) {
               Cell kv = itr.next();
               if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
-                  kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
-                  && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length,
-                  kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
-                // remove the kv that has the full array values.
+                      kv.getFamilyArray(), kv.getFamilyOffset(),
+                      kv.getFamilyLength()) && Bytes.equals(kvExp.getColumnQualifier(), 0,
+                      kvExp.getColumnQualifier().length, kv.getQualifierArray(),
+                      kv.getQualifierOffset(), kv.getQualifierLength())) {
+                // remove the kv that has the full array/json values.
                 itr.remove();
                 break;
               }
             }
           }
         }
-        byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
-            kvSchemaBitSet, ptr);
-        // Add a dummy kv with the exact value of the array index
+        byte[] value = kvSchema.toBytes(tuple, serverParsedFuncRefs, kvSchemaBitSet, ptr);
+        // Add a dummy kv with the exact value of the array index or json value
         result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
-            QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
-            QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
-            QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
-            KeyValue.Type.codeToType(rowKv.getType().getCode()), value, 0, value.length));
+                QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0,
+                QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
+                QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
+                QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
+                KeyValue.Type.codeToType(rowKv.getType().getCode()), value, 0, value.length));
         return getArrayCellPosition(result);
       }
 
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 60f1890..6b11cf5 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -240,6 +240,19 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>2.6.0</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.mongodb/bson -->
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>4.4.0</version>
+        </dependency>
+
         <!-- Omid dependencies -->
         <dependency>
             <groupId>org.apache.omid</groupId>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java
index 7eaa5ef..50bc677 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java
@@ -17,10 +17,31 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -36,26 +57,19 @@
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.phoenix.end2end.IndexToolIT;
-import org.apache.phoenix.exception.PhoenixParserException;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.*;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -184,7 +198,7 @@
                             + "S UNSIGNED_DATE, T UNSIGNED_TIMESTAMP, U CHAR(10), V BINARY(1024), "
                             + "W VARBINARY, Y INTEGER ARRAY, Z VARCHAR ARRAY[10], AA DATE ARRAY, "
                             + "AB TIMESTAMP ARRAY, AC UNSIGNED_TIME ARRAY, AD UNSIGNED_DATE ARRAY, "
-                            + "AE UNSIGNED_TIMESTAMP ARRAY "
+                            + "AE UNSIGNED_TIMESTAMP ARRAY, AF JSON "
                             + "CONSTRAINT pk PRIMARY KEY (id,kp)) "
                             + "MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0" );
             String indexTableName = generateUniqueName();
@@ -824,6 +838,177 @@
     }
 
     @Test
+    public void testPartialIndexWithJson() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar, jsoncol json)");
+            String indexTableName = generateUniqueName();
+            String json = "{\"info\":{\"age\": %s }}";
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id1', 25, 2, 3.14, 'a','" +
+                            String.format(json, 25) + "')");
+
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " (id, A, D, jsoncol)" +
+                            " values ('id2', 100, 'b','" + String.format(json, 100) + "')");
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") + "INDEX " + indexTableName +
+                    " on " + dataTableName + " (CAST(TO_NUMBER(JSON_VALUE(jsoncol, '$.info.age')) AS INTEGER)) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE (CAST(TO_NUMBER(JSON_VALUE(jsoncol, '$.info.age')) AS INTEGER)) > 50 ASYNC");
+
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql =
+                    "SELECT  D from " + dataTableName + " WHERE (CAST(TO_NUMBER(JSON_VALUE(jsoncol, '$.info.age')) AS INTEGER)) > 60";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs, "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+
+            selectSql =
+                    "SELECT  D from " + dataTableName + " WHERE (CAST(TO_NUMBER(JSON_VALUE(jsoncol, '$.info.age')) AS INTEGER)) = 50";
+            rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs, "", dataTableName);
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id3', 50, 2, 9.5, 'c','" + String.format(
+                            json, 50) + "')");
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id4', 75, 2, 9.5, 'd','" + String.format(
+                            json, 75) + "')");
+
+            // Verify that index table includes only the rows with A > 50
+            selectSql = "SELECT * from " + indexTableName;
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(75, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(100, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Overwrite an existing row that satisfies the index WHERE clause
+            // such that the new version of the row does not satisfy the index where clause
+            // anymore. This should result in deleting the index row.
+            String dml =
+                    "UPSERT INTO " + dataTableName + " values ('id2', 0, 2, 9.5, 'd','" + String.format(
+                            json, 0) + "')";
+            conn.createStatement().execute(dml);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(75, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Retrieve the updated row from the data table and verify that the index table is not used
+            selectSql =
+                    "SELECT  ID from " + dataTableName + " WHERE (CAST(TO_NUMBER(JSON_VALUE(jsoncol, '$.info.age')) AS INTEGER)) = 0";
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertPlan((PhoenixResultSet) rs, "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals("id2", rs.getString(1));
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(StringUtils.deleteWhitespace(indexTable.getIndexWhere())
+                        .equals("CAST(TO_NUMBER(JSON_VALUE(JSONCOL,'$.info.age'))ASINTEGER)>50"));
+            }
+        }
+    }
+
+    @Test
+    public void testPartialIndexWithJsonExists() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar, jsoncol json)" +
+                    (salted ? " SALT_BUCKETS=4" : ""));
+            String indexTableName = generateUniqueName();
+            String jsonWithPathExists = "{\"info\":{\"address\":{\"exists\":true}}}";
+            String jsonWithoutPathExists = "{\"info\":{\"age\": 25 }}";
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id1', 70, 2, 3.14, 'a','" + jsonWithPathExists + "')");
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " (id, A, D, jsoncol) values ('id2', 100, 'b','" + jsonWithoutPathExists + "')");
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") + "INDEX " + indexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE JSON_EXISTS(JSONCOL, '$.info.address.exists') ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql =
+                    "SELECT " + (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ") +
+                            " A, D from " + dataTableName + " WHERE A > 60 AND JSON_EXISTS(jsoncol, '$.info.address.exists')";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs, "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id3', 20, 2, 3.14, 'a','" + jsonWithPathExists + "')");
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id4', 90, 2, 3.14, 'a','" + jsonWithPathExists + "')");
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " (id, A, D, jsoncol) values ('id5', 150, 'b','" + jsonWithoutPathExists + "')");
+
+            // Verify that index table includes only the rows where jsonPath Exists
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals(90, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("SELECT Count(*) from " + dataTableName);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs, "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+
+            // Overwrite an existing row that satisfies the index WHERE clause such that
+            // the new version of the row does not satisfy the index where clause anymore. This
+            // should result in deleting the index row.
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " (ID, B, jsoncol) values ('id4', null, '" + jsonWithoutPathExists + "')");
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(StringUtils.deleteWhitespace(indexTable.getIndexWhere())
+                        .equals("JSON_EXISTS(JSONCOL,'$.info.address.exists')"));
+            }
+        }
+    }
+
+    @Test
     public void testPartialIndexWithIndexHint() throws Exception {
         try(Connection conn = DriverManager.getConnection(getUrl());
             Statement stmt = conn.createStatement()) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
new file mode 100644
index 0000000..e7de5db
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.spi.json.GsonJsonProvider;
+import org.apache.commons.io.FileUtils;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.bson.Document;
+import org.bson.RawBsonDocument;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(ParallelStatsDisabledTest.class)
+public class JsonFunctionsIT extends ParallelStatsDisabledIT {
+    public static String BASIC_JSON = "json/json_functions_basic.json";
+    public static String DATA_TYPES_JSON = "json/json_datatypes.json";
+    String basicJson = "";
+    String dataTypesJson = "";
+
+    @Before
+    public void setup() throws IOException {
+        basicJson = getJsonString(BASIC_JSON, "$[0]");
+        dataTypesJson = getJsonString(DATA_TYPES_JSON);
+    }
+
+    @Test
+    public void testSimpleJsonValue() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + " (pk integer primary key, randomVal integer ,col integer, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 123);
+            stmt.setInt(3, 2);
+            stmt.setString(4, basicJson);
+            stmt.execute();
+            conn.commit();
+
+            String queryTemplate ="SELECT pk, randomVal, JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
+                    "JSON_VALUE(jsoncol, '$.info.tags[0]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info'), " +
+                    "JSON_VALUE(jsoncol, '$.info.tags[1]') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
+            String query = String.format(queryTemplate, "AndersenFamily");
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals("123", rs.getString(2));
+            assertEquals("Basic", rs.getString(3));
+            assertEquals("Bristol", rs.getString(4));
+            assertEquals("Sport", rs.getString(5));
+            // returned format is different
+            compareJson(rs.getString(6), basicJson, "$.info.tags");
+            compareJson(rs.getString(7), basicJson, "$.info");
+            assertEquals("Water polo", rs.getString(8));
+            assertFalse(rs.next());
+
+            // Now check for empty match
+            query = String.format(queryTemplate, "Windsors");
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            // check if the explain plan indicates server side execution
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertTrue(QueryUtil.getExplainPlan(rs).contains("    SERVER JSON FUNCTION PROJECTION"));
+        }
+    }
+
+    private void compareJson(String result, String json, String path) throws JsonProcessingException {
+        Configuration conf = Configuration.builder().jsonProvider(new GsonJsonProvider()).build();
+        Object read = JsonPath.using(conf).parse(json).read(path);
+        ObjectMapper mapper = new ObjectMapper();
+        assertEquals(mapper.readTree(read.toString()), mapper.readTree(result));
+    }
+
+    @Test
+    public void testSimpleJsonDatatypes() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table if not exists " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, dataTypesJson);
+            stmt.execute();
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery("SELECT JSON_VALUE(JSONCOL,'$.datatypes.stringtype'), " +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.inttype'), " +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.booltype'), " +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.booltypef'), " +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.doubletype')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.longtype')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.intArray[0]')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.intArray')," +
+                    "JSON_VALUE(JSONCOL, '$')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.nullcheck')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.noKey')," +
+                    "JSON_VALUE(JSONCOL, '$.datatypes.noKey.subkey')  FROM "
+                    + tableName + " WHERE JSON_VALUE(JSONCOL, '$.datatypes.stringtype')='someString'");
+            assertTrue(rs.next());
+            assertEquals("someString", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertEquals("true", rs.getString(3));
+            assertEquals("false", rs.getString(4));
+            assertEquals("2.5", rs.getString(5));
+            assertEquals("1490020778457845", rs.getString(6));
+            assertEquals("1", rs.getString(7));
+            assertEquals(null, rs.getString(8));
+            assertEquals(null, rs.getString(9));
+            assertEquals(null, rs.getString(10));
+            assertEquals(null, rs.getString(11));
+            assertEquals(null, rs.getString(12));
+        }
+    }
+
+    @Test
+    public void testJsonQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table if not exists " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, dataTypesJson);
+            stmt.execute();
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery("SELECT " +
+                    "JSON_QUERY(JSONCOL, '$.datatypes.intArray')," +
+                    "JSON_QUERY(JSONCOL, '$.datatypes.boolArray')," +
+                    "JSON_QUERY(JSONCOL, '$.datatypes.doubleArray')," +
+                    "JSON_QUERY(JSONCOL, '$.datatypes.stringArray')," +
+                    "JSON_QUERY(JSONCOL, '$.datatypes.mixedArray')  FROM "
+                    + tableName + " WHERE JSON_VALUE(JSONCOL, '$.datatypes.stringtype')='someString'");
+            assertTrue(rs.next());
+            compareJson(rs.getString(1), dataTypesJson, "$.datatypes.intArray");
+            compareJson(rs.getString(2), dataTypesJson, "$.datatypes.boolArray");
+            compareJson(rs.getString(3), dataTypesJson, "$.datatypes.doubleArray");
+            compareJson(rs.getString(4), dataTypesJson, "$.datatypes.stringArray");
+            compareJson(rs.getString(5), dataTypesJson, "$.datatypes.mixedArray");
+        }
+    }
+
+    @Test
+    public void testJsonExpressionIndex() throws IOException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String
+                    ddl =
+                    "create table if not exists " + tableName +
+                            " (pk integer primary key, col integer, jsoncol.jsoncol json) COLUMN_ENCODED_BYTES=0";
+            conn.createStatement().execute(ddl);
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (1,2, '" + basicJson + "')");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (2,3, '" + getJsonString(BASIC_JSON, "$[1]") + "')");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (3,4, '" + getJsonString(BASIC_JSON, "$[2]") + "')");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (4,5, '" + getJsonString(BASIC_JSON, "$[3]") + "')");
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (5,6, '" + getJsonString(BASIC_JSON, "$[4]") + "')");
+            conn.commit();
+            conn.createStatement().execute(
+                    "CREATE INDEX " + indexName + " ON " + tableName
+                            + " (JSON_VALUE(JSONCOL,'$.type'), JSON_VALUE(JSONCOL,'$.info.address.town')) include (col)");
+            String
+                    selectSql =
+                    "SELECT JSON_VALUE(JSONCOL,'$.type'), " +
+                            "JSON_VALUE(JSONCOL,'$.info.address.town') FROM " + tableName +
+                            " WHERE JSON_VALUE(JSONCOL,'$.type') = 'Basic'";
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            IndexToolIT.assertExplainPlan(false, actualExplainPlan, tableName, indexName);
+            // Validate the total count of rows
+            String countSql = "SELECT COUNT(1) FROM " + tableName;
+            rs = conn.createStatement().executeQuery(countSql);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            // Delete the rows
+            String deleteQuery = "DELETE FROM " + tableName + " WHERE JSON_VALUE(JSONCOL,'$.type') = 'Normal'";
+            conn.createStatement().execute(deleteQuery);
+            conn.commit();
+            rs = conn.createStatement().executeQuery(countSql);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            // Do a count now for the deleted rows, the count should be 0
+            selectSql = "SELECT COUNT(1) FROM " + tableName +
+                            " WHERE JSON_VALUE(JSONCOL,'$.type') = 'Normal'";
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            // Drop the JSON column
+            conn.createStatement().execute("ALTER TABLE " + tableName + " DROP COLUMN jsoncol ");
+
+            // verify the both of the indexes' metadata were dropped
+            conn.createStatement().execute("SELECT * FROM " + tableName);
+            try {
+                conn.createStatement().execute("SELECT * FROM " + indexName);
+                fail("Index should have been dropped");
+            } catch (TableNotFoundException e) {
+            }
+            PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            PTable dataTable = pconn.getTable(new PTableKey(null, tableName));
+            pconn = conn.unwrap(PhoenixConnection.class);
+            dataTable = pconn.getTable(new PTableKey(null, tableName));
+            try {
+                pconn.getTable(new PTableKey(null, indexName));
+                fail("index should have been dropped");
+            } catch (TableNotFoundException e) {
+            }
+            assertEquals("Unexpected number of indexes ", 0, dataTable.getIndexes().size());
+        } catch (SQLException e) {
+            assertFalse("Failed to execute test", true);
+        }
+    }
+
+    @Test
+    public void testJsonExpressionIndexInvalid() {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        checkInvalidJsonIndexExpression(props, tableName, indexName,
+                " (JSON_QUERY(JSONCOL,'$.info.address')) include (col)");
+    }
+
+    @Test
+    public void testJsonExists() throws SQLException, IOException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, basicJson);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 3);
+            stmt.setString(3, getJsonString(BASIC_JSON, "$[1]"));
+            stmt.execute();
+            conn.commit();
+
+            String query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_EXISTS(jsoncol, '$.info.address.town')";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Basic", rs.getString(1));
+            assertEquals("Bristol", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("Normal", rs.getString(1));
+            assertEquals("Bristol2", rs.getString(2));
+            assertFalse(rs.next());
+
+            query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_EXISTS(jsoncol, '$.info.address.exists')";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Bristol", rs.getString(2));
+            assertFalse(rs.next());
+
+            query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE NOT JSON_EXISTS(jsoncol, '$.info.address.exists')";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("Bristol2", rs.getString(2));
+            assertFalse(rs.next());
+
+            query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_EXISTS(jsoncol, '$.info.address.name')";
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_EXISTS(jsoncol, '$.existsFail')";
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            query ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_EXISTS(jsoncol, '$.existsFail[*]')";
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+        }
+    }
+
+    private void checkInvalidJsonIndexExpression(Properties props, String tableName,
+            String indexName, String indexExpression) {
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String
+                    ddl =
+                    "create table if not exists " + tableName + " (pk integer primary key, col integer, jsoncol.jsoncol json)";
+            conn.createStatement().execute(ddl);
+            conn.createStatement().execute(
+                    "UPSERT INTO " + tableName + " (pk, col, jsoncol) VALUES (1,2, '" + basicJson + "')");
+            conn.createStatement()
+                    .execute("CREATE INDEX " + indexName + " ON " + tableName + indexExpression);
+            conn.commit();
+        } catch (SQLException e) {
+            assertEquals(
+                    SQLExceptionCode.JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION.getErrorCode(),
+                    e.getErrorCode());
+        }
+    }
+
+    private static String getJsonString(String jsonFilePath) throws IOException {
+        return getJsonString(jsonFilePath, "$");
+    }
+
+    private static String getJsonString(String jsonFilePath, String jsonPath) throws IOException {
+        URL fileUrl = JsonFunctionsIT.class.getClassLoader().getResource(jsonFilePath);
+        String json = FileUtils.readFileToString(new File(fileUrl.getFile()));
+        Configuration conf = Configuration.builder().jsonProvider(new GsonJsonProvider()).build();
+        Object read = JsonPath.using(conf).parse(json).read(jsonPath);
+        return read.toString();
+    }
+
+    /**
+     * This test case is used to check if the Server Side execution optimization doesn't take place
+     * when we include the complte JSON column. The case for optimization is covered in
+     * {@link #testSimpleJsonValue()}
+     * @throws Exception
+     */
+    @Test
+    public void testJsonFunctionOptimization() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, basicJson);
+            stmt.execute();
+            conn.commit();
+            String queryTemplate ="SELECT jsoncol, JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
+                    "JSON_VALUE(jsoncol, '$.info.tags[1]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info') " +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
+            String query = String.format(queryTemplate, "AndersenFamily");
+            // check if the explain plan indicates server side execution
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertFalse(QueryUtil.getExplainPlan(rs).contains("    SERVER JSON FUNCTION PROJECTION"));
+        }
+    }
+
+    @Test
+    public void testArrayIndexAndJsonFunctionExpressions() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json, arr INTEGER ARRAY)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, basicJson);
+            Array array = conn.createArrayOf("INTEGER", new Integer[]{1, 2});
+            stmt.setArray(4, array);
+            stmt.execute();
+            conn.commit();
+            String query ="SELECT arr, arr[1], jsoncol, JSON_VALUE(jsoncol, '$.type')" +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            // Since we are using complete array and json col, no server side execution
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            String explainPlan = QueryUtil.getExplainPlan(rs);
+            assertFalse(explainPlan.contains("    SERVER JSON FUNCTION PROJECTION"));
+            assertFalse(explainPlan.contains("    SERVER ARRAY ELEMENT PROJECTION"));
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(conn.createArrayOf("INTEGER", new Integer[]{1, 2}), rs.getArray(1));
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getString(4), "Basic");
+
+            // since we are using Array Index and Json function without full column, optimization
+            // should happen
+            query ="SELECT arr[1], JSON_VALUE(jsoncol, '$.type')" +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            explainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(explainPlan.contains("    SERVER JSON FUNCTION PROJECTION"));
+            assertTrue(explainPlan.contains("    SERVER ARRAY ELEMENT PROJECTION"));
+
+            // only Array optimization and not Json
+            query ="SELECT arr[1], jsoncol, JSON_VALUE(jsoncol, '$.type')" +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            explainPlan = QueryUtil.getExplainPlan(rs);
+            assertFalse(explainPlan.contains("    SERVER JSON FUNCTION PROJECTION"));
+            assertTrue(explainPlan.contains("    SERVER ARRAY ELEMENT PROJECTION"));
+
+            // only Json optimization and not Array Index
+            query ="SELECT arr, arr[1], JSON_VALUE(jsoncol, '$.type')" +
+                    " FROM " + tableName +
+                    " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            explainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(explainPlan.contains("    SERVER JSON FUNCTION PROJECTION"));
+            assertFalse(explainPlan.contains("    SERVER ARRAY ELEMENT PROJECTION"));
+        }
+    }
+
+    @Test
+    public void testServerFunctionsInDifferentOrders() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String
+                    ddl =
+                    "create table " + tableName + " (pk integer primary key, col integer, jsoncol json, arr INTEGER ARRAY, arr2 INTEGER ARRAY)";
+            conn.createStatement().execute(ddl);
+            PreparedStatement
+                    stmt =
+                    conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?,?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setString(3, basicJson);
+            Array array = conn.createArrayOf("INTEGER", new Integer[] { 1, 2 });
+            stmt.setArray(4, array);
+            Array array2 = conn.createArrayOf("INTEGER", new Integer[] { 3, 4 });
+            stmt.setArray(5, array2);
+            stmt.execute();
+            conn.commit();
+
+            // First Array elements, JSON_VALUE and then JSON_QUERY
+            String
+                    query =
+                    "SELECT arr, arr[1], arr2, arr2[1], jsoncol, " +
+                            "JSON_VALUE(jsoncol, '$.type'), " +
+                            "JSON_QUERY(jsoncol, '$.info') " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(rs.getArray(1), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getArray(3), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+            assertEquals(rs.getInt(4), 3);
+            compareJson(rs.getString(5), basicJson, "$");
+            assertEquals(rs.getString(6), "Basic");
+            compareJson(rs.getString(7), basicJson, "$.info");
+
+            // First JSON_VALUE, JSON_QUERY, ARRAY
+            query =
+                    "SELECT jsoncol, JSON_VALUE(jsoncol, '$.type'), " +
+                            "JSON_QUERY(jsoncol, '$.info'), arr, arr[1], arr2, arr2[1] " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            compareJson(rs.getString(1), basicJson, "$");
+            assertEquals(rs.getString(2), "Basic");
+            compareJson(rs.getString(3), basicJson, "$.info");
+            assertEquals(rs.getArray(4), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+            assertEquals(rs.getInt(5), 1);
+            assertEquals(rs.getArray(6), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+            assertEquals(rs.getInt(7), 3);
+
+            // First JSON_QUERY, ARRAY, JSON_VALUE
+            query =
+                    "SELECT JSON_QUERY(jsoncol, '$.info'), arr, arr[1], arr2, arr2[1], jsoncol, " +
+                            "JSON_VALUE(jsoncol, '$.type') " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            compareJson(rs.getString(1), basicJson, "$.info");
+            assertEquals(rs.getArray(2), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+            assertEquals(rs.getInt(3), 1);
+            assertEquals(rs.getArray(4), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+            assertEquals(rs.getInt(5), 3);
+            compareJson(rs.getString(6), basicJson, "$");
+            assertEquals(rs.getString(7), "Basic");
+
+            //JUMBLED FUNCTIONS
+            query =
+                    "SELECT JSON_QUERY(jsoncol, '$.info.tags'), " +
+                            "JSON_VALUE(jsoncol, '$.info.address.town'), arr,  arr[1], " +
+                            "JSON_QUERY(jsoncol, '$.info'), arr2, " +
+                            "JSON_VALUE(jsoncol, '$.info.tags[0]'), arr2[1], jsoncol, " +
+                            "JSON_VALUE(jsoncol, '$.type') " +
+                            " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            compareJson(rs.getString(1), basicJson, "$.info.tags");
+            assertEquals(rs.getString(2),"Bristol");
+            assertEquals(rs.getArray(3), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+            assertEquals(rs.getInt(4), 1);
+            compareJson(rs.getString(5), basicJson, "$.info");
+            assertEquals(rs.getArray(6), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+            assertEquals(rs.getString(7), "Sport");
+            assertEquals(rs.getInt(8), 3);
+            compareJson(rs.getString(9), basicJson, "$");
+            assertEquals(rs.getString(10), "Basic");
+
+        }
+    }
+
+    @Test
+    public void testJsonWithSetGetObjectAPI() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + " (pk integer primary key, jsoncol json)";
+            conn.createStatement().execute(ddl);
+            // Set as String as get RawBsonDocument Object
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?)");
+            stmt.setInt(1, 1);
+            stmt.setString(2, basicJson);
+            stmt.execute();
+            conn.commit();
+
+            String query ="SELECT * FROM " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            RawBsonDocument rawBson = (RawBsonDocument) rs.getObject(2);
+            compareJson(rawBson.toJson(), basicJson, "$");
+            assertFalse(rs.next());
+
+            // Set as RawJsonDocument and get the same.
+            /**
+             * {
+             *   "info": {
+             *     "type": 1,
+             *     "address": {
+             *       "town": "Bristol",
+             *       "county": "Avon",
+             *       "country": "England",
+             *       "exists": true
+             *     },
+             *     "tags": [
+             *       "Sport",
+             *       "Water polo"
+             *     ]
+             *   },
+             *   "type": "Basic",
+             *   "name": "AndersenFamily"
+             * }
+             */
+            Document info = new Document()
+                    .append("info", new Document()
+                            .append("type", 1)
+                            .append("address", new Document()
+                                    .append("town", "Bristol")
+                                    .append("county", "Avon")
+                                    .append("country", "England")
+                                    .append("exists", true))
+                            .append("tags", Arrays.asList("Sport", "Water polo")))
+                    .append("type", "Basic")
+                    .append("name", "AndersenFamily");
+            RawBsonDocument document = RawBsonDocument.parse(info.toJson());
+            stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?)");
+            stmt.setInt(1, 2);
+            stmt.setObject(2, document);
+            stmt.execute();
+            conn.commit();
+
+            query ="SELECT * FROM " + tableName + " WHERE pk = 2";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            RawBsonDocument rawBsonDocument = (RawBsonDocument) rs.getObject(2);
+            compareJson(rawBsonDocument.toJson(), info.toJson(), "$");
+            assertFalse(rs.next());
+
+            // Set as RawJson and get as String
+            stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?)");
+            stmt.setInt(1, 3);
+            stmt.setObject(2, document);
+            stmt.execute();
+            conn.commit();
+
+            query ="SELECT * FROM " + tableName + " WHERE pk = 3";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            String jsonStr = rs.getString(2);
+            compareJson(jsonStr, info.toJson(), "$");
+            assertFalse(rs.next());
+        }
+    }
+}
diff --git a/phoenix-core/src/it/resources/json/json_datatypes.json b/phoenix-core/src/it/resources/json/json_datatypes.json
new file mode 100644
index 0000000..7cebac0
--- /dev/null
+++ b/phoenix-core/src/it/resources/json/json_datatypes.json
@@ -0,0 +1,36 @@
+{
+  "datatypes": {
+    "stringtype": "someString",
+    "inttype": 1,
+    "booltype": true,
+    "booltypef": false,
+    "doubletype": 2.5,
+    "longtype": 1490020778457845,
+    "intArray": [
+      1,
+      2,
+      3
+    ],
+    "nullcheck": null,
+    "boolArray": [
+      true,
+      false,
+      false
+    ],
+    "doubleArray": [
+      1.2,
+      2.3,
+      3.4
+    ],
+    "stringArray": [
+      "hello",
+      "world"
+    ],
+    "mixedArray": [
+      2,
+      "string",
+      1.2,
+      false
+    ]
+  }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/resources/json/json_functions_basic.json b/phoenix-core/src/it/resources/json/json_functions_basic.json
new file mode 100644
index 0000000..6b44afe
--- /dev/null
+++ b/phoenix-core/src/it/resources/json/json_functions_basic.json
@@ -0,0 +1,83 @@
+[
+  {
+  "info": {
+    "type": 1,
+    "address": {
+      "town": "Bristol",
+      "county": "Avon",
+      "country": "England",
+      "exists": true
+    },
+    "tags": [
+      "Sport",
+      "Water polo"
+    ]
+  },
+  "type": "Basic",
+  "name": "AndersenFamily"
+},
+  {
+    "info": {
+      "type": 2,
+      "address": {
+        "town": "Bristol2",
+        "county": "Avon2",
+        "country": "France"
+      },
+      "tags": [
+        "Cricket",
+        "Foot Ball"
+      ]
+    },
+    "type": "Normal",
+    "name": "SomeFamily"
+  },
+  {
+    "info": {
+      "type": 3,
+      "address": {
+        "town": "Bristol3",
+        "county": "Avon3",
+        "country": "Australia"
+      },
+      "tags": [
+        "Rugby",
+        "Ice Hockey"
+      ]
+    },
+    "type": "Advanced",
+    "name": "AdvancedFamily"
+  },
+  {
+    "info": {
+      "type": 4,
+      "address": {
+        "town": "Bristol4",
+        "county": "Avon4",
+        "country": "Antarctica"
+      },
+      "tags": [
+        "HorseRide",
+        "Dolphine"
+      ]
+    },
+    "type": "Advanced",
+    "name": "AntarcticaFamily"
+  },
+  {
+    "info": {
+      "type": 5,
+      "address": {
+        "town": "Bristol5",
+        "county": "Avon5",
+        "country": "Europe"
+      },
+      "tags": [
+        "Jumping",
+        "Hopping"
+      ]
+    },
+    "type": "Normal",
+    "name": "EuropeFamily"
+  }
+]
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 6e4d171..905c47d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -1043,13 +1043,13 @@
         PhoenixConnection pconn = DriverManager.getConnection(getUrl(),
                 PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
         String ddl = "create table myTable(ID varchar primary key, A integer, B varchar, " +
-                "C date, D double, E integer)";
+                "C date, D double, E integer, F json)";
         pconn.createStatement().execute(ddl);
         ddl = "create table myTableDesc(ID varchar primary key DESC, A integer, B varchar, " +
-                "C date, D double, E integer)";
+                "C date, D double, E integer, F json)";
         pconn.createStatement().execute(ddl);
 
-        final int NUM = 15;
+        final int NUM = 20;
         String[] containingQueries = new String[NUM];
         String[] containedQueries = new String[NUM];
 
@@ -1106,6 +1106,22 @@
         containedQueries[14] = "select * from myTable where " +
                 " CURRENT_DATE() - PHOENIX_ROW_TIMESTAMP() < 5 ";
 
+        containingQueries[15] = "select * from myTable where ID > 'i3' and A > 1 and JSON_VALUE(F, '$.type') > 'i3'";
+        containedQueries[15] = "select * from myTableDesc where (ID > 'i7' or ID = 'i4') and " +
+                "A > 2 * 10 and (JSON_VALUE(F, '$.type') > 'i7' or JSON_VALUE(F, '$.type') = 'i4')";
+
+        containingQueries[16] = "select * from myTable where JSON_VALUE(F, '$.type') is not null";
+        containedQueries[16] = "select * from myTable where JSON_VALUE(F, '$.type') > 'i3'";
+
+        containingQueries[17] = "select * from myTable where JSON_VALUE(F, '$.type') like '%abc'";
+        containedQueries[17] = "select * from myTable where (JSON_VALUE(F, '$.type') like '%abc' and ID > 'i1')";
+
+        containingQueries[18] = "select * from myTable where JSON_EXISTS(F, '$.type')";
+        containedQueries[18] = "select * from myTable where JSON_EXISTS(F, '$.type') and JSON_VALUE(F, '$.type') > 'i3'";
+
+        containingQueries[19] = "select * from myTable where JSON_VALUE(F, '$.type') IN ('i3', 'i7', 'i1') and A < 10";
+        containedQueries[19] = "select * from myTableDesc where JSON_VALUE(F, '$.type') IN ('i1', 'i7') and A < 10 / 2";
+
         for (int i = 0; i < NUM; i++) {
             Assert.assertTrue(WhereCompiler.contains(getDNF(pconn, containingQueries[i]),
                     getDNF(pconn, containedQueries[i])));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
index b7baa97..c625972 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/expression/CoerceExpressionTest.java
@@ -28,15 +28,15 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PJson;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.junit.Test;
 
-import org.apache.phoenix.schema.types.PDataType;
-
 /**
  * Test class for unit-testing {@link CoerceExpression}
  * 
@@ -99,15 +99,13 @@
         Long value = (Long)obj;
         assertTrue(value.equals(Long.valueOf(1)));
     }
-	
+
 	@Test
 	public void testCoerceExpressionSupportsCoercingAllPDataTypesToVarBinary() throws Exception {
-		for(PDataType p : PDataType.values()) {
-			if (!p.isArrayType()) {
-				LiteralExpression v = LiteralExpression.newConstant(
-						map.get(p.getJavaClass()), p);
-				CoerceExpression e = new CoerceExpression(v,
-            PVarbinary.INSTANCE);
+		for (PDataType p : PDataType.values()) {
+			if (!p.isArrayType() && !p.equals(PJson.INSTANCE)) {
+				LiteralExpression v = LiteralExpression.newConstant(map.get(p.getJavaClass()), p);
+				CoerceExpression e = new CoerceExpression(v, PVarbinary.INSTANCE);
 				ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 				e.evaluate(null, ptr);
 				Object obj = e.getDataType().toObject(ptr);
@@ -116,22 +114,18 @@
 			}
 		}
 	}
-	
 
 	@Test
-    public void testCoerceExpressionSupportsCoercingAllPDataTypesToBinary() throws Exception {
-		for(PDataType p : PDataType.values()) {
-			if (!p.isArrayType()) {
-				LiteralExpression v = LiteralExpression.newConstant(
-						map.get(p.getJavaClass()), p);
+	public void testCoerceExpressionSupportsCoercingAllPDataTypesToBinary() throws Exception {
+		for (PDataType p : PDataType.values()) {
+			if (!p.isArrayType() && !p.equals(PJson.INSTANCE)) {
+				LiteralExpression v = LiteralExpression.newConstant(map.get(p.getJavaClass()), p);
 				CoerceExpression e = new CoerceExpression(v, PBinary.INSTANCE);
 				ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 				e.evaluate(null, ptr);
 				Object obj = e.getDataType().toObject(ptr);
-				assertTrue("Coercing to BINARY failed for PDataType " + p,
-						obj instanceof byte[]);
+				assertTrue("Coercing to BINARY failed for PDataType " + p, obj instanceof byte[]);
 			}
 		}
-    }
-	
+	}
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
index 02fd0c7..74c5a99 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
@@ -1759,6 +1759,7 @@
              + "FLOAT ARRAY=[BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, VARBINARY ARRAY], "
              + "INTEGER=[BIGINT, BINARY, DECIMAL, DOUBLE, FLOAT, INTEGER, VARBINARY], "
              + "INTEGER ARRAY=[BIGINT ARRAY, BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, INTEGER ARRAY, VARBINARY ARRAY], "
+             + "JSON=[BINARY, JSON, VARBINARY], "
              + "SMALLINT=[BIGINT, BINARY, DECIMAL, DOUBLE, FLOAT, INTEGER, SMALLINT, VARBINARY], "
              + "SMALLINT ARRAY=[BIGINT ARRAY, BINARY ARRAY, DECIMAL ARRAY, DOUBLE ARRAY, FLOAT ARRAY, INTEGER ARRAY, SMALLINT ARRAY, VARBINARY ARRAY], "
              + "TIME=[BINARY, DATE, TIME, TIMESTAMP, VARBINARY], "
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
index e0aa056..339b191 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexUtilTest.java
@@ -138,6 +138,10 @@
     private void verifyIndexColumnDataTypes(boolean isNullable, String expected) {
         Map<String, String> indexColumnDataTypes = Maps.newTreeMap();
         for (PDataType dataType : PDataTypeFactory.getInstance().getTypes()) {
+            if (!dataType.isComparisonSupported()) {
+                // JSON Datatype can't be an IndexColumn
+                continue;
+            }
             String indexColumnDataType = "unsupported";
             try {
                 indexColumnDataType = IndexUtil.getIndexColumnDataType(isNullable, dataType).toString();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
index 0cf7009..5460e08 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
@@ -165,6 +165,10 @@
                     // JIRA - https://issues.apache.org/jira/browse/PHOENIX-1329
                     continue;
                 }
+                // Condition to check if a type can be used as a primary key.
+                if (!pType.canBePrimaryKey()) {
+                    continue;
+                }
                 if (pType.isArrayType() && PDataType.arrayBaseType(pType).isFixedWidth() && PDataType.arrayBaseType(pType).getByteSize() == null) {
                     // Need to treat array type whose base type is of fixed width whose byte size is not known as a special case. 
                     // Cannot just use the sql type name returned by PDataType.getSqlTypeName().
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index 3bbe728..09bf5a3 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -33,7 +33,8 @@
     BOOLEAN("BOOLEAN", Types.BOOLEAN),
     BIGINT("BIGINT", Types.BIGINT),
     UNSIGNED_INT("UNSIGNED_INT", Types.INTEGER),
-    TINYINT("TINYINT", Types.TINYINT);
+    TINYINT("TINYINT", Types.TINYINT),
+    JSON("JSON", Types.VARBINARY);
 
     private final String sType;
 
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index c47798c..8172d2c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -60,13 +60,14 @@
         while (m.find()) {
             String dynamicField = m.group(0).replace("[", "").replace("]", "");
             Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
-            needQuotes =
-                    (dynamicColumn.getType() == DataTypeMapping.CHAR
-                            || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
-            ret =
-                    ret.replace("[" + dynamicField + "]",
-                            needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue()
-                                    + needQuotes);
+            // For Json we can have queries like info[5].name and it should not match
+            if (dynamicColumn != null && dynamicColumn.getType() != null) {
+                needQuotes = (dynamicColumn.getType() == DataTypeMapping.CHAR
+                        || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+                ret = ret.replace("[" + dynamicField + "]",
+                                needQuotes + ruleApplier.getDataValue(dynamicColumn)
+                                        .getValue() + needQuotes);
+            }
         }
         return ret;
     }
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index a75e99f..52d7885 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -155,7 +155,8 @@
         List<Scenario> scenarios = dataModel != null ? dataModel.getScenarios() : parser.getScenarios();
         DataValue value = null;
         if (scenarios.contains(scenario)) {
-            LOGGER.debug("We found a correct Scenario" + scenario.getName());
+            LOGGER.debug("We found a correct Scenario" + scenario.getName() +
+                    "column " + phxMetaColumn.getName() + " " + phxMetaColumn.getType());
             
             Map<DataTypeMapping, List> overrideRuleMap = this.getCachedScenarioOverrides(scenario);
             
@@ -163,7 +164,7 @@
 	            List<Column> overrideRuleList = this.getCachedScenarioOverrides(scenario).get(phxMetaColumn.getType());
 	            
 				if (overrideRuleList != null && overrideRuleList.contains(phxMetaColumn)) {
-					LOGGER.debug("We found a correct override column rule");
+                    LOGGER.debug("We found a correct override column rule" + overrideRuleList);
 					Column columnRule = getColumnForRuleOverride(overrideRuleList, phxMetaColumn);
 					if (columnRule != null) {
 						return getDataValue(columnRule);
@@ -178,7 +179,7 @@
             // Make sure Column from Phoenix Metadata matches a rule column
             if (ruleList != null && ruleList.contains(phxMetaColumn)) {
                 // Generate some random data based on this rule
-                LOGGER.debug("We found a correct column rule");
+                LOGGER.debug("We found a correct column rule" + ruleList);
                 Column columnRule = getColumnForRule(ruleList, phxMetaColumn);
 
                 value = getDataValue(columnRule);
@@ -222,107 +223,116 @@
         }
 
         switch (column.getType()) {
-            case VARCHAR:
-            case VARBINARY:
-            case CHAR:
-                // Use the specified data values from configs if they exist
-                if (DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
-                    RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
-                    data = generator.getDataValue();
-                } else if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                } else {
-                    Preconditions.checkArgument(length > 0, "length needs to be > 0");
-                    data = getRandomDataValue(column);
-                }
-                break;
-            case VARCHAR_ARRAY:
-            	//only list datavalues are supported
-            	String arr = "";
-            	for (DataValue dv : dataValues) {
-            		arr += "," + dv.getValue();
-            	}
-            	if (arr.startsWith(",")) {
-            		arr = arr.replaceFirst(",", "");
-            	}
-            	data = new DataValue(column.getType(), arr);
-            	break;
-            case DECIMAL:
-                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                } else {
-                    int precision = column.getPrecision();
-                    double minDbl = column.getMinValue();
-                    Preconditions.checkArgument((precision > 0) && (precision <= 18), "Precision must be between 0 and 18");
-                    Preconditions.checkArgument(minDbl >= 0, "minvalue must be set in configuration for decimal");
-                    Preconditions.checkArgument(column.getMaxValue() > 0, "maxValue must be set in configuration decimal");
-                    StringBuilder maxValueStr = new StringBuilder();
+        case VARCHAR:
+        case VARBINARY:
+        case JSON:
+        case CHAR:
+            // Use the specified data values from configs if they exist
+            if (DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
+                RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
+                data = generator.getDataValue();
+            } else if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                data = pickDataValueFromList(dataValues);
+            } else {
+                Preconditions.checkArgument(length > 0, "length needs to be > 0");
+                data = getRandomDataValue(column);
+            }
+            break;
+        case VARCHAR_ARRAY:
+            //only list datavalues are supported
+            String arr = "";
+            for (DataValue dv : dataValues) {
+                arr += "," + dv.getValue();
+            }
+            if (arr.startsWith(",")) {
+                arr = arr.replaceFirst(",", "");
+            }
+            data = new DataValue(column.getType(), arr);
+            break;
+        case DECIMAL:
+            if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                data = pickDataValueFromList(dataValues);
+            } else {
+                int precision = column.getPrecision();
+                double minDbl = column.getMinValue();
+                Preconditions.checkArgument((precision > 0) && (precision <= 18),
+                        "Precision must be between 0 and 18");
+                Preconditions.checkArgument(minDbl >= 0,
+                        "minvalue must be set in configuration for decimal");
+                Preconditions.checkArgument(column.getMaxValue() > 0,
+                        "maxValue must be set in configuration decimal");
+                StringBuilder maxValueStr = new StringBuilder();
 
-                    for (int i = 0; i < precision; i++) {
-                        maxValueStr.append(9);
-                    }
+                for (int i = 0; i < precision; i++) {
+                    maxValueStr.append(9);
+                }
 
-                    double maxDbl = Math.min(column.getMaxValue(), Double.parseDouble(maxValueStr.toString()));
-                    final double dbl = RandomUtils.nextDouble(minDbl, maxDbl);
-                    data = new DataValue(column.getType(), String.valueOf(dbl));
+                double maxDbl =
+                        Math.min(column.getMaxValue(), Double.parseDouble(maxValueStr.toString()));
+                final double dbl = RandomUtils.nextDouble(minDbl, maxDbl);
+                data = new DataValue(column.getType(), String.valueOf(dbl));
+            }
+            break;
+        case TINYINT:
+        case INTEGER:
+            if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                data = pickDataValueFromList(dataValues);
+            } else if (DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
+                RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
+                data = generator.getDataValue();
+            } else {
+                int minInt = (int) column.getMinValue();
+                int maxInt = (int) column.getMaxValue();
+                if (column.getType() == DataTypeMapping.TINYINT) {
+                    Preconditions.checkArgument((minInt >= -128) && (minInt <= 128),
+                            "min value need to be set in configuration for tinyints " + column.getName());
+                    Preconditions.checkArgument((maxInt >= -128) && (maxInt <= 128),
+                            "max value need to be set in configuration for tinyints " + column.getName());
                 }
-                break;
-            case TINYINT:
-            case INTEGER:
-                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                } else if(DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
-                    RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
-                    data = generator.getDataValue();
-                } else {
-                    int minInt = (int) column.getMinValue();
-                    int maxInt = (int) column.getMaxValue();
-                    if (column.getType() == DataTypeMapping.TINYINT) {
-                        Preconditions.checkArgument((minInt >= -128) && (minInt <= 128), "min value need to be set in configuration for tinyints " + column.getName());
-                        Preconditions.checkArgument((maxInt >= -128) && (maxInt <= 128), "max value need to be set in configuration for tinyints " + column.getName());
-                    }
-                    int intVal = ThreadLocalRandom.current().nextInt(minInt, maxInt + 1);
-                    data = new DataValue(column.getType(), String.valueOf(intVal));
-                }
-                break;
-            case BIGINT:
-            case UNSIGNED_LONG:
-                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                } else {
-                    long minLong = column.getMinValue();
-                    long maxLong = column.getMaxValue();
-                    if (column.getType() == DataTypeMapping.UNSIGNED_LONG)
-                        Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
-                    long longVal = RandomUtils.nextLong(minLong, maxLong);
-                    data = new DataValue(column.getType(), String.valueOf(longVal));
-                }
-                break;
-            case DATE:
-            case TIMESTAMP:
-                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                    // Check if date has right format or not
-                    data.setValue(checkDatePattern(data.getValue()));
-                } else if(DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
-                    RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
-                    data = generator.getDataValue();
-                } else if (column.getUseCurrentDate() != true){
-                    int minYear = (int) column.getMinValue();
-                    int maxYear = (int) column.getMaxValue();
-                    Preconditions.checkArgument((minYear > 0) && (maxYear > 0), "min and max values need to be set in configuration for date/timestamps " + column.getName());
+                int intVal = ThreadLocalRandom.current().nextInt(minInt, maxInt + 1);
+                data = new DataValue(column.getType(), String.valueOf(intVal));
+            }
+            break;
+        case BIGINT:
+        case UNSIGNED_LONG:
+            if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                data = pickDataValueFromList(dataValues);
+            } else {
+                long minLong = column.getMinValue();
+                long maxLong = column.getMaxValue();
+                if (column.getType() == DataTypeMapping.UNSIGNED_LONG)
+                    Preconditions.checkArgument((minLong > 0) && (maxLong > 0),
+                            "min and max values need to be set in configuration for unsigned_longs " + column.getName());
+                long longVal = RandomUtils.nextLong(minLong, maxLong);
+                data = new DataValue(column.getType(), String.valueOf(longVal));
+            }
+            break;
+        case DATE:
+        case TIMESTAMP:
+            if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                data = pickDataValueFromList(dataValues);
+                // Check if date has right format or not
+                data.setValue(checkDatePattern(data.getValue()));
+            } else if (DataSequence.SEQUENTIAL.equals(column.getDataSequence())) {
+                RuleBasedDataGenerator generator = getRuleBasedDataGeneratorForColumn(column);
+                data = generator.getDataValue();
+            } else if (column.getUseCurrentDate() != true) {
+                int minYear = (int) column.getMinValue();
+                int maxYear = (int) column.getMaxValue();
+                Preconditions.checkArgument((minYear > 0) && (maxYear > 0),
+                        "min and max values need to be set in configuration for date/timestamps " + column.getName());
 
-                    String dt = generateRandomDate(minYear, maxYear);
-                    data = new DataValue(column.getType(), dt);
-                    data.setMaxValue(String.valueOf(minYear));
-                    data.setMinValue(String.valueOf(maxYear));
-                } else {
-                    String dt = getCurrentDate();
-                    data = new DataValue(column.getType(), dt);
-                }
-                break;
-            default:
-                break;
+                String dt = generateRandomDate(minYear, maxYear);
+                data = new DataValue(column.getType(), dt);
+                data.setMaxValue(String.valueOf(minYear));
+                data.setMinValue(String.valueOf(maxYear));
+            } else {
+                String dt = getCurrentDate();
+                data = new DataValue(column.getType(), dt);
+            }
+            break;
+        default:
+            break;
         }
         Preconditions.checkArgument(data != null,
                 "Data value could not be generated for some reason. Please check configs");
@@ -569,6 +579,7 @@
             //For now we only have couple of these, likely this should replace for all the methods
             switch (column.getType()) {
             case VARCHAR:
+            case JSON:
             case VARBINARY:
             case CHAR:
                 if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialListDataGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialListDataGenerator.java
index ddcc354..3b3d4ed 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialListDataGenerator.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialListDataGenerator.java
@@ -57,6 +57,7 @@
         switch (mapping) {
         case VARCHAR:
         case VARBINARY:
+        case JSON:
         case CHAR:
             return true;
         default:
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialVarcharDataGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialVarcharDataGenerator.java
index 9ae23b8..92e7080 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialVarcharDataGenerator.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialVarcharDataGenerator.java
@@ -66,6 +66,7 @@
         switch (mapping) {
         case VARCHAR:
         case VARBINARY:
+        case JSON:
         case CHAR:
             return true;
         default:
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 90793c1..4c1b6b4 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -21,7 +21,6 @@
 import com.google.gson.Gson;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.configuration.Column;
@@ -518,6 +517,13 @@
                     statement.setString(count, dataValue.getValue());
                 }
                 break;
+            case JSON:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARBINARY);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
             case CHAR:
                 if (dataValue.getValue().equals("")) {
                     statement.setNull(count, Types.CHAR);