[CARBONDATA-3514] Support Spark 2.4.4 integration

This PR adds integration with Spark 2.4.4

This closes #3378
diff --git a/README.md b/README.md
index a34784d..2f661ed 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,8 @@
 
 
 ## Status
-Spark2.2:
-[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
+Spark2.3:
+[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
 [![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
 <a href="https://scan.coverity.com/projects/carbondata">
   <img alt="Coverity Scan Build Status"
diff --git a/build/README.md b/build/README.md
index f361a6e..960ccce 100644
--- a/build/README.md
+++ b/build/README.md
@@ -25,11 +25,9 @@
 * [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
 
 ## Build command
-Build with different supported versions of Spark, by default using Spark 2.2.1 to build
+Build with different supported versions of Spark, by default using Spark 2.4.4
 ```
-mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
-mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
-mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
+mvn -DskipTests -Pspark-2.4 clean package
 ```
 
 Note:
@@ -39,5 +37,5 @@
 ## For contributors : To build the format code after any changes, please follow the below command.
 Note:Need install Apache Thrift 0.9.3
 ```
-mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
+mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
 ```
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index d0e5a42..c18298d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -27,11 +27,11 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,12 +42,15 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
 
 /**
  * Stores datamap schema in disk as json format
  */
 public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
 
+  private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
   private String storePath;
 
   private String mdtFilePath;
@@ -171,17 +174,15 @@
     if (!FileFactory.isFileExist(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
     }
-    Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
-    while (iterator.hasNext()) {
-      DataMapSchema schema = iterator.next();
-      if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
-        iterator.remove();
-      }
-    }
+
+    LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
+
+    dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
     touchMDTFile();
     if (!FileFactory.deleteFile(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
     }
+    LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
   }
 
   private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index b32367b..08841b6 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -373,7 +373,8 @@
 
   def updateColumnName(attr: Attribute, counter: Int): String = {
     val name = getUpdatedName(attr.name, counter)
-    attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
+    val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
+    if (value.nonEmpty) value.head else name
   }
 
   def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -473,7 +474,7 @@
   }
 
   def createAttrReference(ref: NamedExpression, name: String): Alias = {
-    Alias(ref, name)(exprId = ref.exprId, qualifier = None)
+    CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
   }
 
   case class AttributeKey(exp: Expression) {
@@ -537,13 +538,13 @@
         case attr: AttributeReference =>
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
-              CarbonToSparkAdapter.createAttributeReference(a.name,
-                a.dataType,
-                a.nullable,
-                a.metadata,
-                a.exprId,
-                attr.qualifier,
-                a)
+              CarbonToSparkAdapter.createAttributeReference(
+                name = a.name,
+                dataType = a.dataType,
+                nullable = a.nullable,
+                metadata = a.metadata,
+                exprId = a.exprId,
+                qualifier = attr.qualifier)
             } else {
               a
             }
@@ -575,9 +576,9 @@
     outputSel.zip(subsumerOutputList).map{ case (l, r) =>
       l match {
         case attr: AttributeReference =>
-          Alias(attr, r.name)(r.exprId, None)
+          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
         case a@Alias(attr: AttributeReference, name) =>
-          Alias(attr, r.name)(r.exprId, None)
+          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
         case other => other
       }
     }
@@ -594,13 +595,13 @@
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
               CarbonToSparkAdapter
-                .createAttributeReference(a.name,
+                .createAttributeReference(
+                  a.name,
                   a.dataType,
                   a.nullable,
                   a.metadata,
                   a.exprId,
-                  attr.qualifier,
-                  a)
+                  attr.qualifier)
             } else {
               a
             }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 3ddb0fc..cff5c41 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -125,17 +125,18 @@
             arrayBuffer += relation
           }
           var qualifier: Option[String] = None
-          if (attr.qualifier.isDefined) {
-            qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
+          if (attr.qualifier.nonEmpty) {
+            qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
               Some(carbonTable.getTableName)
             } else {
-              attr.qualifier
+              attr.qualifier.headOption
             }
           }
           fieldToDataMapFieldMap +=
-          getFieldToDataMapFields(attr.name,
+          getFieldToDataMapFields(
+            attr.name,
             attr.dataType,
-            qualifier,
+            qualifier.headOption,
             "",
             arrayBuffer,
             carbonTable.getTableName)
@@ -248,7 +249,8 @@
   /**
    * Below method will be used to get the fields object for mv table
    */
-  private def getFieldToDataMapFields(name: String,
+  private def getFieldToDataMapFields(
+      name: String,
       dataType: DataType,
       qualifier: Option[String],
       aggregateType: String,
@@ -313,7 +315,7 @@
     val updatedOutList = outputList.map { col =>
       val duplicateColumn = duplicateNameCols
         .find(a => a.semanticEquals(col))
-      val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+      val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
       if (duplicateColumn.isDefined) {
         val attributesOfDuplicateCol = duplicateColumn.get.collect {
           case a: AttributeReference => a
@@ -329,7 +331,7 @@
           attributeOfCol.exists(a => a.semanticEquals(expr)))
         if (!isStrictDuplicate) {
           Alias(col, qualifiedName)(exprId = col.exprId)
-        } else if (col.qualifier.isDefined) {
+        } else if (col.qualifier.nonEmpty) {
           Alias(col, qualifiedName)(exprId = col.exprId)
           // this check is added in scenario where the column is direct Attribute reference and
           // since duplicate columns select is allowed, we should just put alias for those columns
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 7e8eb96..6fbc87f 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -18,11 +18,12 @@
 
 package org.apache.carbondata.mv.rewrite
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Metadata}
 
 import org.apache.carbondata.mv.datamap.MVHelper
 import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -95,9 +96,12 @@
       // Replace all compensation1 attributes with refrences of subsumer attributeset
       val compensationFinal = compensation1.transformExpressions {
         case ref: Attribute if subqueryAttributeSet.contains(ref) =>
-          AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+          CarbonToSparkAdapter.createAttributeReference(
+            ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+            exprId = ref.exprId, qualifier = subsumerName)
         case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
-          Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+          CarbonToSparkAdapter.createAliasRef(
+            alias.child, alias.name, alias.exprId, subsumerName)
       }
       compensationFinal
     } else {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index cb2043e..2b4247e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -17,10 +17,11 @@
 
 package org.apache.carbondata.mv.plans.modular
 
+import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.types.Metadata
 
 import org.apache.carbondata.mv.plans
 import org.apache.carbondata.mv.plans._
@@ -198,18 +199,18 @@
                                                               .isInstanceOf[Attribute]))
           val aggOutputList = aggTransMap.values.flatMap(t => t._2)
             .map { ref =>
-              AttributeReference(ref.name, ref.dataType)(
-                exprId = ref.exprId,
-                qualifier = Some(hFactName))
+              CarbonToSparkAdapter.createAttributeReference(
+                ref.name, ref.dataType, nullable = true, Metadata.empty,
+                ref.exprId, Some(hFactName))
             }
           val hFactOutputSet = hFact.outputSet
           // Update the outputlist qualifier
           val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
             attr.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(hFactName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(hFactName))
             }
           }.asInstanceOf[Seq[NamedExpression]]
 
@@ -217,9 +218,9 @@
           val hPredList = s.predicateList.map{ pred =>
             pred.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(hFactName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(hFactName))
             }
           }
           val hSel = s.copy(
@@ -241,9 +242,9 @@
           val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
           wip.transformExpressions {
             case ref: Attribute if hFactOutputSet.contains(ref) =>
-              AttributeReference(ref.name, ref.dataType)(
-                exprId = ref.exprId,
-                qualifier = Some(hFactName))
+              CarbonToSparkAdapter.createAttributeReference(
+                ref.name, ref.dataType, nullable = true, Metadata.empty,
+                ref.exprId, Some(hFactName))
           }
         }
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index 30857c8..b694e78 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -52,9 +52,11 @@
       plan transform {
         case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
           val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
-          val makeupmap = children.zipWithIndex.flatMap {
+          val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
             case (child, i) =>
-              aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+              aq.find(child.outputSet.contains(_))
+                .flatMap(_.qualifier.headOption)
+                .map((i, _))
           }.toMap
           g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 0bbacc4..7068b7e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -110,10 +110,7 @@
         RewriteCorrelatedScalarSubquery,
         EliminateSerialization,
         SparkSQLUtil.getRemoveRedundantAliasesObj(),
-        RemoveRedundantProject,
-        SimplifyCreateStructOps,
-        SimplifyCreateArrayOps,
-        SimplifyCreateMapOps) ++
+        RemoveRedundantProject) ++
                                             extendedOperatorOptimizationRules: _*) ::
     Batch(
       "Check Cartesian Products", Once,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index 3b6c725..2033342 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -167,7 +167,9 @@
     val aq = attributeSet.filter(_.qualifier.nonEmpty)
     children.zipWithIndex.flatMap {
       case (child, i) =>
-        aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+        aq.find(child.outputSet.contains(_))
+          .flatMap(_.qualifier.headOption)
+          .map((i, _))
     }.toMap
   }
 
@@ -353,28 +355,13 @@
           Seq.empty)
       case l: LogicalRelation =>
         val tableIdentifier = l.catalogTable.map(_.identifier)
-        val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
-        val table = tableIdentifier.map(_.table).getOrElse(null)
+        val database = tableIdentifier.flatMap(_.database).orNull
+        val table = tableIdentifier.map(_.table).orNull
         Some(database, table, l.output, Nil, NoFlags, Seq.empty)
       case l: LocalRelation => // used for unit test
         Some(null, null, l.output, Nil, NoFlags, Seq.empty)
       case _ =>
-        // this check is added as we get MetastoreRelation in spark2.1,
-        // this is removed in later spark version
-        // TODO: this check can be removed once 2.1 support is removed from carbon
-        if (SparkUtil.isSparkVersionEqualTo("2.1") &&
-            plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
-          val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
-            .asInstanceOf[CatalogTable]
-          Some(catalogTable.database,
-            catalogTable.identifier.table,
-            plan.output,
-            Nil,
-            NoFlags,
-            Seq.empty)
-        } else {
           None
-        }
     }
   }
 }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index d3ce38d..366284b 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -204,7 +204,7 @@
                          s.child match {
                            case a: Alias =>
                              val qualifierPrefix = a.qualifier
-                               .map(_ + ".").getOrElse("")
+                               .map(_ + ".").headOption.getOrElse("")
                              s"$qualifierPrefix${
                                quoteIdentifier(a
                                  .name)
@@ -221,7 +221,7 @@
                        s.child match {
                          case a: Alias =>
                            val qualifierPrefix = a.qualifier.map(_ + ".")
-                             .getOrElse("")
+                             .headOption.getOrElse("")
                            s"$qualifierPrefix${ quoteIdentifier(a.name) }"
 
                          case other => other.sql
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index b17eea2..c3a3a68 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -21,8 +21,11 @@
 
 import scala.collection.immutable
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.types.Metadata
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans._
@@ -116,18 +119,19 @@
                 if (i > -1) {
                   // this is a walk around for mystery of spark qualifier
                   if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
-                    AttributeReference(
-                      ref.name,
-                      ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
+                    CarbonToSparkAdapter.createAttributeReference(
+                      ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+                      exprId = ref.exprId, qualifier = Some(aliasMap(i)))
                   } else {
                     ref
                   }
                 } else {
                   attrMap.get(ref) match {
                     case Some(alias) =>
-                      AttributeReference(
+                      CarbonToSparkAdapter.createAttributeReference(
                         alias.child.asInstanceOf[AttributeReference].name,
-                        ref.dataType)(exprId = ref.exprId,
+                        ref.dataType, nullable = true, metadata = Metadata.empty,
+                        exprId = ref.exprId,
                         alias.child.asInstanceOf[AttributeReference].qualifier)
                     case None => ref
                   }
@@ -178,13 +182,12 @@
                 list = list :+ ((index, subqueryName))
                 newS = newS.transformExpressions {
                   case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
-                    AttributeReference(ref.name, ref.dataType)(
-                      exprId = ref.exprId,
-                      qualifier = Some(subqueryName))
+                    CarbonToSparkAdapter.createAttributeReference(
+                      ref.name, ref.dataType, nullable = true, Metadata.empty,
+                      ref.exprId, Some(subqueryName))
                   case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                    Alias(alias.child, alias.name)(
-                      exprId = alias.exprId,
-                      qualifier = Some(subqueryName))
+                    CarbonToSparkAdapter.createAliasRef(
+                      alias.child, alias.name, alias.exprId, Some(subqueryName))
                 }
 
               case _ =>
@@ -212,13 +215,12 @@
             }
             newG.transformExpressions {
               case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
-                AttributeReference(ref.name, ref.dataType)(
-                  exprId = ref.exprId,
-                  qualifier = Some(subqueryName))
+                CarbonToSparkAdapter.createAttributeReference(
+                  ref.name, ref.dataType, nullable = true, Metadata.empty,
+                  ref.exprId, Some(subqueryName))
               case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                Alias(alias.child, alias.name)(
-                  exprId = alias.exprId,
-                  qualifier = Some(subqueryName))
+                CarbonToSparkAdapter.createAliasRef(
+                  alias.child, alias.name, alias.exprId, Some(subqueryName))
             }.copy(alias = Some(subqueryName))
         }
       }
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
index b1bfeeb..bad1fc0 100644
--- a/docs/alluxio-guide.md
+++ b/docs/alluxio-guide.md
@@ -50,7 +50,7 @@
 ### Running spark-shell
  - Running the command in spark path
  ```$command
-./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
 ```
  - Testing use alluxio by CarbonSession
  ```$scala
@@ -98,7 +98,7 @@
 --master local \
 --jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
 --class org.apache.carbondata.examples.AlluxioExample \
-${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar \
 false
 ```
 **NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
diff --git a/docs/faq.md b/docs/faq.md
index 16cdfa5..88ca186 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -316,7 +316,7 @@
   2. Use the following command :
 
   ```
-  mvn -Pspark-2.1 -Dspark.version {yourSparkVersion} clean package
+  mvn -Pspark-2.4 -Dspark.version {yourSparkVersion} clean package
   ```
   
 Note : Refrain from using "mvn clean package" without specifying the profile.
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 483585f..0c49f35 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -237,9 +237,9 @@
   $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
   ```
   Replace the spark and hadoop version with the version used in your cluster.
-  For example, if you are using Spark 2.2.1 and Hadoop 2.7.2, you would like to compile using:
+  For example, if you are using Spark 2.4.4, you would like to compile using:
   ```
-  mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 -Dhadoop.version=2.7.2 clean package
+  mvn -DskipTests -Pspark-2.4 -Dspark.version=2.4.4 -Dhadoop.version=2.7.2 clean package
   ```
 
   Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index b66fce4..d007e03 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -37,11 +37,11 @@
     - [CLOSE STREAM](#close-stream)
 
 ## Quick example
-Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
+Download and unzip spark-2.4.4-bin-hadoop2.7.tgz, and export $SPARK_HOME
 
-Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
+Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.6.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
 ```shell
-mvn clean package -DskipTests -Pspark-2.2
+mvn clean package -DskipTests -Pspark-2.4
 ```
 
 Start a socket data server in a terminal
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index b6921f2..d5c1188 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,9 +37,9 @@
       s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
 
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
-    spark.sparkContext.setLogLevel("INFO")
+    spark.sparkContext.setLogLevel("error")
     exampleBody(spark)
     spark.close()
   }
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index 73bf941..de3b5bc 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -184,26 +184,6 @@
 
     <profiles>
         <profile>
-            <id>spark-2.2</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.carbondata</groupId>
-                    <artifactId>carbondata-spark2</artifactId>
-                    <version>${project.version}</version>
-                    <scope>test</scope>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>org.apache.hive</groupId>
-                            <artifactId>hive-exec</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
             <id>spark-2.3</id>
             <activation>
                 <activeByDefault>true</activeByDefault>
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 199ff84..0790763 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -191,4 +191,103 @@
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>build-all</id>
+      <properties>
+        <spark.version>2.3.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+    </profile>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.3.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.4</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.4</id>
+      <properties>
+        <spark.version>2.4.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.4</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index f629260..2548110 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,11 +37,12 @@
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -93,9 +94,9 @@
     def closePartition(): Unit = {
       if (currentFiles.nonEmpty) {
         val newPartition =
-          FilePartition(
+          CarbonToSparkAdapter.createFilePartition(
             partitions.size,
-            currentFiles.toArray.toSeq)
+            currentFiles)
         partitions += newPartition
       }
       currentFiles.clear()
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 13e7c45..a46568a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,23 +17,22 @@
 
 package org.apache.spark.sql.util
 
-import java.lang.reflect.Method
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EmptyRule
+import org.apache.spark.sql.catalyst.analysis.EliminateView
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.optimizer.{CheckCartesianProducts, EliminateOuterJoin, NullPropagation, PullupCorrelatedPredicates, RemoveRedundantAliases, ReorderJoin}
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
+import org.apache.spark.util.SerializableConfiguration
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -51,166 +50,60 @@
   }
 
   def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
-      method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val method: Method = logicalPlanObj.getClass.getMethod("stats")
-      method.invoke(logicalPlanObj).asInstanceOf[Statistics]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    logicalPlanObj.stats
   }
 
   def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
       : NamedExpression = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
-      clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
-        invoke(null, r, input).asInstanceOf[NamedExpression]
-    } else {
-      r
-    }
+    QueryPlan.normalizeExprId(r, input)
   }
 
   def getStatisticsObj(outputList: Seq[NamedExpression],
                        plan: LogicalPlan, stats: Statistics,
                        aliasMap: Option[AttributeMap[Attribute]] = None)
   : Statistics = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val output = outputList.map(_.toAttribute)
-      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-        table => AttributeMap(table.output.zip(output))
-      }
-      val rewrites = mapSeq.head
-      val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
-        getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
-      var attributeStats = AttributeMap(attributes.iterator
-        .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
-      if (aliasMap.isDefined) {
-        attributeStats = AttributeMap(
-          attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
-      }
-      val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
-      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
-        stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
-    } else {
-      val output = outputList.map(_.name)
-      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-        table => table.output.map(_.name).zip(output).toMap
-      }
-      val rewrites = mapSeq.head
-      val colStats = CarbonReflectionUtils.getField("colStats", stats)
-        .asInstanceOf[Map[String, ColumnStat]]
-      var attributeStats = colStats.iterator
-        .map { pair => (rewrites(pair._1), pair._2) }.toMap
-      if (aliasMap.isDefined) {
-        val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
-        attributeStats =
-          attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
-            , pair._2))
-      }
-      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
-        stats.rowCount, attributeStats).asInstanceOf[Statistics]
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
     }
+    val rewrites = mapSeq.head
+    val attributes : AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    val hints = stats.hints
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, hints)
   }
 
   def getEliminateViewObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    EliminateView
   }
 
   def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    PullupCorrelatedPredicates
   }
 
   def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
-      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      EmptyRule
-    }
+    RemoveRedundantAliases
   }
 
   def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    ReorderJoin
   }
 
   def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    EliminateOuterJoin
   }
 
   def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    NullPropagation
   }
 
   def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
-      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
-        .asInstanceOf[Rule[LogicalPlan]]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
-      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
-    }
-    else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    CheckCartesianProducts
   }
 
   /**
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 46692df..93e66ea 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,24 +17,22 @@
 
 package org.apache.spark.util
 
-import java.lang.reflect.Method
-
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
-import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -60,45 +58,19 @@
   def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
     val im = rm.reflect(obj)
     im.symbol.typeSignature.members.find(_.name.toString.equals(name))
-      .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+      .map(l => im.reflectField(l.asTerm).get).orNull
   }
 
   def getUnresolvedRelation(
       tableIdentifier: TableIdentifier,
       tableAlias: Option[String] = None): UnresolvedRelation = {
-    val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        tableIdentifier,
-        tableAlias)._1.asInstanceOf[UnresolvedRelation]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      createObject(
-        className,
-        tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
-    } else {
-      throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
-    }
+    UnresolvedRelation(tableIdentifier)
   }
 
   def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        alias.getOrElse(""),
-        relation,
-        Option(view))._1.asInstanceOf[SubqueryAlias]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      createObject(
-        className,
-        alias.getOrElse(""),
-        relation)._1.asInstanceOf[SubqueryAlias]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    SubqueryAlias(alias.getOrElse(""), relation)
   }
 
   def getInsertIntoCommand(table: LogicalPlan,
@@ -106,58 +78,23 @@
       query: LogicalPlan,
       overwrite: Boolean,
       ifPartitionNotExists: Boolean): InsertIntoTable = {
-    val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val overwriteOptions = createObject(
-        "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
-        overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
-      createObject(
-        className,
-        table,
-        partition,
-        query,
-        overwriteOptions,
-        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
-      createObject(
-        className,
-        table,
-        partition,
-        query,
-        overwrite.asInstanceOf[Object],
-        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
   }
 
   def getLogicalRelation(relation: BaseRelation,
       expectedOutputAttributes: Seq[Attribute],
       catalogTable: Option[CatalogTable],
       isStreaming: Boolean): LogicalRelation = {
-    val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      createObject(
-        className,
-        relation,
-        Some(expectedOutputAttributes),
-        catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      createObject(
-        className,
-        relation,
-        expectedOutputAttributes,
-        catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      createObject(
-        className,
-        relation,
-        expectedOutputAttributes,
-        catalogTable,
-        isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark version")
-    }
+    new LogicalRelation(
+      relation,
+      expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
+      catalogTable,
+      isStreaming)
   }
 
 
@@ -208,46 +145,28 @@
   def getSessionState(sparkContext: SparkContext,
       carbonSession: Object,
       useHiveMetaStore: Boolean): Any = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+    if (useHiveMetaStore) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSessionState")
-      createObject(className, carbonSession)._1
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      if (useHiveMetaStore) {
-        val className = sparkContext.conf.get(
-          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-          "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
-        val tuple = createObject(className, carbonSession, None)
-        val method = tuple._2.getMethod("build")
-        method.invoke(tuple._1)
-      } else {
-        val className = sparkContext.conf.get(
-          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-          "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
-        val tuple = createObject(className, carbonSession, None)
-        val method = tuple._2.getMethod("build")
-        method.invoke(tuple._1)
-      }
+        "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+      val tuple = createObject(className, carbonSession, None)
+      val method = tuple._2.getMethod("build")
+      method.invoke(tuple._1)
     } else {
-      throw new UnsupportedOperationException("Spark version not supported")
+      val className = sparkContext.conf.get(
+        CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+        "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+      val tuple = createObject(className, carbonSession, None)
+      val method = tuple._2.getMethod("build")
+      method.invoke(tuple._1)
     }
   }
 
   def hasPredicateSubquery(filterExp: Expression) : Boolean = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
-      val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
-      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
-      hasSubquery
-    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
-      val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
-      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
-      hasSubquery
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
+    val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
+    val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+    hasSubquery
   }
 
   def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
@@ -265,19 +184,10 @@
       rdd: RDD[InternalRow],
       partition: Partitioning,
       metadata: Map[String, String]): RowDataSourceScanExec = {
-    val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
-    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
-      createObject(className, output, rdd, relation.relation,
-        partition, metadata,
-        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
-    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      createObject(className, output, output.map(output.indexOf),
-        pushedFilters.toSet, handledFilters.toSet, rdd,
-        relation.relation,
-        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    RowDataSourceScanExec(output, output.map(output.indexOf),
+      pushedFilters.toSet, handledFilters.toSet, rdd,
+      relation.relation,
+      relation.catalogTable.map(_.identifier))
   }
 
   def invokewriteAndReadMethod(dataSourceObj: DataSource,
@@ -287,25 +197,7 @@
       mode: SaveMode,
       query: LogicalPlan,
       physicalPlan: SparkPlan): BaseRelation = {
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = dataSourceObj.getClass
-        .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
-      method.invoke(dataSourceObj, mode, dataFrame)
-        .asInstanceOf[BaseRelation]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      val method: Method = dataSourceObj.getClass
-        .getMethod("writeAndRead",
-          classOf[SaveMode],
-          classOf[LogicalPlan],
-          classOf[Seq[String]],
-          classOf[SparkPlan])
-      // since spark 2.3.2 version (SPARK-PR#22346),
-      // change 'query.output' to 'query.output.map(_.name)'
-      method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
-        .asInstanceOf[BaseRelation]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
   }
 
   /**
@@ -316,9 +208,7 @@
    */
   def invokeAlterTableAddColumn(table: TableIdentifier,
       colsToAdd: Seq[StructField]): Object = {
-    val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
-    CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
-      ._1.asInstanceOf[RunnableCommand]
+    AlterTableAddColumnsCommand(table, colsToAdd)
   }
 
   def createSingleObject(className: String): Any = {
@@ -385,16 +275,6 @@
 
   def invokeAnalyzerExecute(analyzer: Analyzer,
       plan: LogicalPlan): LogicalPlan = {
-    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
-      val method: Method = analyzer.getClass
-        .getMethod("execute", classOf[LogicalPlan])
-      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
-    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
-      val method: Method = analyzer.getClass
-        .getMethod("executeAndCheck", classOf[LogicalPlan])
-      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    analyzer.executeAndCheck(plan)
   }
 }
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
similarity index 65%
rename from integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
rename to integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
index 4abf189..a2ca9e6 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
+++ b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.carbondata.spark.adapter
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import scala.collection.mutable.ArrayBuffer
 
-/**
-  * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
-  * recognize a subquery as such, and it allows us to write subquery aware transformations.
-  */
-case class Subquery(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
\ No newline at end of file
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+
+object CarbonToSparkAdapter {
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+}
diff --git a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..be82907
--- /dev/null
+++ b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.adapter
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray)
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
+    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qf)
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Option[String] = None,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr : Option[NamedExpression] = None ) : Alias = {
+    Alias(child, name)(
+      exprId,
+      if (qualifier.nonEmpty) Seq(qualifier.get) else Seq(),
+      explicitMetadata)
+  }
+}
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index cda1954..1f1cac3 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -192,86 +192,6 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
-      <properties>
-        <spark.version>2.1.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.3plus</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.1andspark2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.2</id>
-      <properties>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.3plus</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.1andspark2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -286,30 +206,6 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.1andspark2.2</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.3plus</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
           </plugin>
         </plugins>
       </build>
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
deleted file mode 100644
index 605df66..0000000
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.io.api.Binary;
-
-public class CarbonDictionaryWrapper extends Dictionary {
-
-  private Binary[] binaries;
-
-  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
-    super(encoding);
-    binaries = new Binary[dictionary.getDictionarySize()];
-    for (int i = 0; i < binaries.length; i++) {
-      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
-    }
-  }
-
-  @Override
-  public int getMaxId() {
-    return binaries.length - 1;
-  }
-
-  @Override
-  public Binary decodeToBinary(int id) {
-    return binaries[id];
-  }
-}
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7d23d7c..0000000
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import java.lang.reflect.Field;
-import java.math.BigInteger;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
-
-import org.apache.parquet.column.Encoding;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-  private ColumnarBatch columnarBatch;
-  private ColumnVectorProxy[] columnVectorProxies;
-
-
-  private void updateColumnVectors() {
-    try {
-      Field field = columnarBatch.getClass().getDeclaredField("columns");
-      field.setAccessible(true);
-      field.set(columnarBatch, columnVectorProxies);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Adapter class which handles the columnar vector reading of the carbondata
-   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-   * handles the complexity of spark 2.3 version related api changes since
-   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-   *
-   * @param memMode       which represent the type onheap or offheap vector.
-   * @param outputSchema, metadata related to current schema of table.
-   * @param rowNum        rows number for vector reading
-   * @param useLazyLoad   Whether to use lazy load while getting the data.
-   */
-  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
-      boolean useLazyLoad) {
-    columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
-    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
-    for (int i = 0; i < columnVectorProxies.length; i++) {
-      if (useLazyLoad) {
-        columnVectorProxies[i] =
-            new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
-      } else {
-        columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
-      }
-    }
-    updateColumnVectors();
-  }
-
-  public ColumnVectorProxy getColumnVector(int ordinal) {
-    return columnVectorProxies[ordinal];
-  }
-
-  /**
-   * Returns the number of rows for read, including filtered rows.
-   */
-  public int numRows() {
-    return columnarBatch.capacity();
-  }
-
-  /**
-   * This API will return a columnvector from a batch of column vector rows
-   * based on the ordinal
-   *
-   * @param ordinal
-   * @return
-   */
-  public ColumnVector column(int ordinal) {
-    return columnarBatch.column(ordinal);
-  }
-
-  /**
-   * Resets this column for writing. The currently stored values are no longer accessible.
-   */
-  public void reset() {
-    for (int i = 0; i < columnarBatch.numCols(); i++) {
-      ((ColumnVectorProxy) columnarBatch.column(i)).reset();
-    }
-  }
-
-  public void resetDictionaryIds(int ordinal) {
-    (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
-  }
-
-  /**
-   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-   */
-  public InternalRow getRow(int rowId) {
-    return columnarBatch.getRow(rowId);
-  }
-
-  /**
-   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-   */
-  public Object getColumnarBatch() {
-    return columnarBatch;
-  }
-
-  /**
-   * Called to close all the columns in this batch. It is not valid to access the data after
-   * calling this. This must be called at the end to clean up memory allocations.
-   */
-  public void close() {
-    columnarBatch.close();
-  }
-
-  /**
-   * Sets the number of rows in this batch.
-   */
-  public void setNumRows(int numRows) {
-    columnarBatch.setNumRows(numRows);
-  }
-
-  public DataType dataType(int ordinal) {
-    return columnarBatch.column(ordinal).dataType();
-  }
-
-  public static class ColumnVectorProxy extends ColumnVector {
-
-    private ColumnVector vector;
-
-    public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
-      super(capacity, columnVector.dataType(), mode);
-      try {
-        Field childColumns =
-            columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
-        childColumns.setAccessible(true);
-        Object o = childColumns.get(columnVector);
-        childColumns.set(this, o);
-        Field resultArray =
-            columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
-        resultArray.setAccessible(true);
-        Object o1 = resultArray.get(columnVector);
-        resultArray.set(this, o1);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      vector = columnVector;
-    }
-
-    public void putRowToColumnBatch(int rowId, Object value) {
-      org.apache.spark.sql.types.DataType t = dataType();
-      if (null == value) {
-        putNull(rowId);
-      } else {
-        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-          putBoolean(rowId, (boolean) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-          putByte(rowId, (byte) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-          putShort(rowId, (short) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-          putInt(rowId, (int) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-          putLong(rowId, (long) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-          putFloat(rowId, (float) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-          putDouble(rowId, (double) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-          UTF8String v = (UTF8String) value;
-          putByteArray(rowId, v.getBytes());
-        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-          DecimalType dt = (DecimalType) t;
-          Decimal d = Decimal.fromDecimal(value);
-          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-            putInt(rowId, (int) d.toUnscaledLong());
-          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-            putLong(rowId, d.toUnscaledLong());
-          } else {
-            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-            byte[] bytes = integer.toByteArray();
-            putByteArray(rowId, bytes, 0, bytes.length);
-          }
-        } else if (t instanceof CalendarIntervalType) {
-          CalendarInterval c = (CalendarInterval) value;
-          vector.getChildColumn(0).putInt(rowId, c.months);
-          vector.getChildColumn(1).putLong(rowId, c.microseconds);
-        } else if (t instanceof org.apache.spark.sql.types.DateType) {
-          putInt(rowId, (int) value);
-        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-          putLong(rowId, (long) value);
-        }
-      }
-    }
-
-    public void putBoolean(int rowId, boolean value) {
-      vector.putBoolean(rowId, value);
-    }
-
-    public void putByte(int rowId, byte value) {
-      vector.putByte(rowId, value);
-    }
-
-    public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putBytes(rowId, count, src, srcIndex);
-    }
-
-    public void putShort(int rowId, short value) {
-      vector.putShort(rowId, value);
-    }
-
-    public void putInt(int rowId, int value) {
-      vector.putInt(rowId, value);
-    }
-
-    public void putFloat(int rowId, float value) {
-      vector.putFloat(rowId, value);
-    }
-
-    public void putFloats(int rowId, int count, float[] src, int srcIndex) {
-      vector.putFloats(rowId, count, src, srcIndex);
-    }
-
-    public void putLong(int rowId, long value) {
-      vector.putLong(rowId, value);
-    }
-
-    public void putDouble(int rowId, double value) {
-      vector.putDouble(rowId, value);
-    }
-
-    public void putInts(int rowId, int count, int value) {
-      vector.putInts(rowId, count, value);
-    }
-
-    public void putInts(int rowId, int count, int[] src, int srcIndex) {
-      vector.putInts(rowId, count, src, srcIndex);
-    }
-
-    public void putShorts(int rowId, int count, short value) {
-      vector.putShorts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-      vector.putShorts(rowId, count, src, srcIndex);
-    }
-
-    public void putLongs(int rowId, int count, long value) {
-      vector.putLongs(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-      vector.putLongs(rowId, count, src, srcIndex);
-    }
-
-    public void putDoubles(int rowId, int count, double value) {
-      vector.putDoubles(rowId, count, value);
-    }
-
-    public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-      vector.putDoubles(rowId, count, src, srcIndex);
-    }
-
-    public DataType dataType(int ordinal) {
-      return vector.dataType();
-    }
-
-    public void putNotNull(int rowId) {
-      vector.putNotNull(rowId);
-    }
-
-    public void putNotNulls(int rowId, int count) {
-      vector.putNotNulls(rowId, count);
-    }
-
-    public void putDictionaryInt(int rowId, int value) {
-      vector.getDictionaryIds().putInt(rowId, value);
-    }
-
-    public void setDictionary(CarbonDictionary dictionary) {
-      if (null != dictionary) {
-        CarbonDictionaryWrapper dictionaryWrapper =
-            new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
-        vector.setDictionary(dictionaryWrapper);
-        this.dictionary = dictionaryWrapper;
-      } else {
-        this.dictionary = null;
-        vector.setDictionary(null);
-      }
-    }
-
-    public void putNull(int rowId) {
-      vector.putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count) {
-      vector.putNulls(rowId, count);
-    }
-
-    public boolean hasDictionary() {
-      return vector.hasDictionary();
-    }
-
-    public ColumnVector reserveDictionaryIds(int capacity) {
-      this.dictionaryIds = vector.reserveDictionaryIds(capacity);
-      return dictionaryIds;
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-      return vector.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-      return vector.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-      return vector.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-      return vector.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-      return vector.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-      return vector.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-      return vector.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-      return vector.getDouble(i);
-    }
-
-    @Override
-    protected void reserveInternal(int capacity) {
-    }
-
-    @Override
-    public void reserve(int requiredCapacity) {
-      vector.reserve(requiredCapacity);
-    }
-
-    @Override
-    public long nullsNativeAddress() {
-      return vector.nullsNativeAddress();
-    }
-
-    @Override
-    public long valuesNativeAddress() {
-      return vector.valuesNativeAddress();
-    }
-
-    @Override
-    public void putBooleans(int rowId, int count, boolean value) {
-      vector.putBooleans(rowId, count, value);
-    }
-
-    @Override
-    public void putBytes(int rowId, int count, byte value) {
-      vector.putBytes(rowId, count, value);
-    }
-
-    @Override
-    public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putIntsLittleEndian(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public int getDictId(int rowId) {
-      return vector.getDictId(rowId);
-    }
-
-    @Override
-    public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putLongsLittleEndian(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putFloats(int rowId, int count, float value) {
-      vector.putFloats(rowId, count, value);
-    }
-
-    @Override
-    public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putFloats(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
-      vector.putDoubles(rowId, count, src, srcIndex);
-    }
-
-    @Override
-    public void putArray(int rowId, int offset, int length) {
-      vector.putArray(rowId, offset, length);
-    }
-
-    @Override
-    public int getArrayLength(int rowId) {
-      return vector.getArrayLength(rowId);
-    }
-
-    @Override
-    public int getArrayOffset(int rowId) {
-      return vector.getArrayOffset(rowId);
-    }
-
-    @Override
-    public void loadBytes(Array array) {
-      vector.loadBytes(array);
-    }
-
-    @Override
-    public int putByteArray(int rowId, byte[] value, int offset, int count) {
-      return vector.putByteArray(rowId, value, offset, count);
-    }
-
-    /**
-     * It keeps all binary data of all rows to it.
-     * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
-     * and offset.
-     */
-    public void putAllByteArray(byte[] data, int offset, int length) {
-      vector.arrayData().appendBytes(length, data, offset);
-    }
-
-    @Override
-    public void close() {
-      vector.close();
-    }
-
-    public void reset() {
-      if (isConstant) {
-        return;
-      }
-      vector.reset();
-    }
-
-    public void setLazyPage(LazyPageLoader lazyPage) {
-      lazyPage.loadPage();
-    }
-
-    public ColumnVector getVector() {
-      return vector;
-    }
-  }
-
-  public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
-
-    private ColumnVector vector;
-
-    private LazyPageLoader pageLoad;
-
-    private boolean isLoaded;
-
-    public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
-      super(columnVector, capacity, mode);
-      vector = columnVector;
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-      checkPageLoaded();
-      return vector.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-      checkPageLoaded();
-      return vector.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-      checkPageLoaded();
-      return vector.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-      checkPageLoaded();
-      return vector.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-      checkPageLoaded();
-      return vector.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-      checkPageLoaded();
-      return vector.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-      checkPageLoaded();
-      return vector.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-      checkPageLoaded();
-      return vector.getDouble(i);
-    }
-
-    @Override
-    public int getArrayLength(int rowId) {
-      checkPageLoaded();
-      return vector.getArrayLength(rowId);
-    }
-
-    @Override
-    public int getArrayOffset(int rowId) {
-      checkPageLoaded();
-      return vector.getArrayOffset(rowId);
-    }
-
-    private void checkPageLoaded() {
-      if (!isLoaded) {
-        if (pageLoad != null) {
-          pageLoad.loadPage();
-        }
-        isLoaded = true;
-      }
-    }
-
-    public void reset() {
-      if (isConstant) {
-        return;
-      }
-      isLoaded = false;
-      vector.reset();
-    }
-
-    public void setLazyPage(LazyPageLoader lazyPage) {
-      this.pageLoad = lazyPage;
-    }
-
-  }
-}
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 7b65e0b..dfebf6f 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -253,92 +253,6 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
-      <properties>
-        <spark.version>2.1.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.2</exclude>
-                <exclude>src/main/spark2.3</exclude>
-                <exclude>src/main/commonTo2.2And2.3</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.1</source>
-                    <source>src/main/commonTo2.1And2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.2</id>
-      <properties>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.1</exclude>
-                <exclude>src/main/spark2.3</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.2</source>
-                    <source>src/main/commonTo2.2And2.3</source>
-                    <source>src/main/commonTo2.1And2.2</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -355,9 +269,7 @@
             <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
               <excludes>
-                <exclude>src/main/spark2.1</exclude>
-                <exclude>src/main/spark2.2</exclude>
-                <exclude>src/main/commonTo2.1And2.2</exclude>
+                <exclude>src/main/spark2.4</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -375,7 +287,46 @@
                 <configuration>
                   <sources>
                     <source>src/main/spark2.3</source>
-                    <source>src/main/commonTo2.2And2.3</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.4</id>
+      <properties>
+        <spark.version>2.4.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.4</source>
                   </sources>
                 </configuration>
               </execution>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index aa650e0..78d6a46 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -17,13 +17,8 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
 
 case class CastExpr(expr: Expression) extends Filter {
   override def references: Array[String] = null
@@ -33,26 +28,6 @@
   override def references: Array[String] = null
 }
 
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
-  extends LeafExpression with NamedExpression with CodegenFallback {
-
-  type EvaluatedType = Any
-
-  override def toString: String = s"input[" + colExp.getColIndex + "]"
-
-  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
-  override def name: String = colExp.getColumnName
-
-  override def toAttribute: Attribute = throw new UnsupportedOperationException
-
-  override def exprId: ExprId = throw new UnsupportedOperationException
-
-  override def qualifier: Option[String] = null
-
-  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
-
 case class CarbonEndsWith(expr: Expression) extends Filter {
   override def references: Array[String] = null
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index e020a99..23c078a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -258,41 +258,67 @@
             s"""
                |org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
                  """.stripMargin
-            ExprCode(code, s"$value.getIsNull()",
-              s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
+            CarbonToSparkAdapter.createExprCode(
+              code,
+              s"$value.getIsNull()",
+              s"((org.apache.spark.sql.types.Decimal)$value.getValue())",
+              expr.dataType)
           } else {
             getDictionaryColumnIds(index)._3.getDataType match {
               case CarbonDataTypes.INT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Integer)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.SHORT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Short)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.DOUBLE => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Double)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.LONG => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Long)$value.getValue())",
+                  expr.dataType)
               case CarbonDataTypes.BOOLEAN => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((Boolean)$value.getValue())",
+                  expr.dataType)
               case _ => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
                  """.stripMargin
-                ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
-
+                CarbonToSparkAdapter.createExprCode(
+                  code,
+                  s"$value.getIsNull()",
+                  s"((UTF8String)$value.getValue())",
+                  expr.dataType)
             }
           }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 376d121..703df20 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,11 +40,9 @@
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
@@ -183,12 +181,9 @@
         LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
           "Now existing Schema will be overwritten with default properties")
         val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        val identifier = AbsoluteTableIdentifier.from(
-          CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
-          dbName,
-          tableName)
+        val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
         val updatedParams = CarbonSource.updateAndCreateTable(
-          identifier, dataSchema, sparkSession, metaStore, parameters, None)
+          dbName, tableName, tablePath, dataSchema, sparkSession, metaStore, parameters, None)
         (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -278,9 +273,10 @@
   lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
 
   def createTableInfoFromParams(
+      databaseName: String,
+      tableName: String,
       parameters: Map[String, String],
       dataSchema: StructType,
-      identifier: AbsoluteTableIdentifier,
       query: Option[LogicalPlan],
       sparkSession: SparkSession): TableModel = {
     val sqlParser = new CarbonSpark2SqlParser
@@ -301,8 +297,8 @@
         sqlParser.getFields(dataSchema)
     }
     val bucketFields = sqlParser.getBucketFields(map, fields, options)
-    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
-      identifier.getTableName, fields, Nil, map, bucketFields)
+    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(databaseName),
+      tableName, fields, Nil, map, bucketFields)
   }
 
   /**
@@ -314,7 +310,8 @@
   def updateCatalogTableWithCarbonSchema(
       tableDesc: CatalogTable,
       sparkSession: SparkSession,
-      query: Option[LogicalPlan] = None): CatalogTable = {
+      query: Option[LogicalPlan] = None,
+      persistSchema: Boolean = true): CatalogTable = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
@@ -322,14 +319,16 @@
       val tablePath = CarbonEnv.getTablePath(
         tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
       val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
-      val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
       val map = updateAndCreateTable(
-        identifier,
+        dbName,
+        tableDesc.identifier.table,
+        tablePath,
         tableDesc.schema,
         sparkSession,
         metaStore,
         properties,
-        query)
+        query,
+        persistSchema)
       // updating params
       val updatedFormat = CarbonToSparkAdapter
         .getUpdatedStorageFormat(storageFormat, map, tablePath)
@@ -351,36 +350,56 @@
     }
   }
 
+  def createTableInfo(
+      databaseName: String,
+      tableName: String,
+      tablePath: String,
+      dataSchema: StructType,
+      properties: Map[String, String],
+      query: Option[LogicalPlan],
+      sparkSession: SparkSession
+  ): TableInfo = {
+    val model = createTableInfoFromParams(
+      databaseName, tableName, properties, dataSchema, query, sparkSession)
+    val tableInfo = TableNewProcessor(model)
+    val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
+    tableInfo.setTablePath(tablePath)
+    tableInfo.setTransactionalTable(isTransactionalTable)
+    tableInfo.setDatabaseName(databaseName)
+    val schemaEvolutionEntry = new SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    tableInfo
+  }
+
   def updateAndCreateTable(
-      identifier: AbsoluteTableIdentifier,
+      databaseName: String,
+      tableName: String,
+      tablePath: String,
       dataSchema: StructType,
       sparkSession: SparkSession,
       metaStore: CarbonMetaStore,
       properties: Map[String, String],
-      query: Option[LogicalPlan]): Map[String, String] = {
-    val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
-    val tableInfo: TableInfo = TableNewProcessor(model)
-    val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
-    tableInfo.setTablePath(identifier.getTablePath)
-    tableInfo.setTransactionalTable(isTransactionalTable)
-    tableInfo.setDatabaseName(identifier.getDatabaseName)
-    val schemaEvolutionEntry = new SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
-      metaStore.saveToDisk(tableInfo, identifier.getTablePath)
+      query: Option[LogicalPlan],
+      persistSchema: Boolean = true): Map[String, String] = {
+    val tableInfo = createTableInfo(
+      databaseName, tableName, tablePath, dataSchema, properties, query, sparkSession)
+   val map = if (persistSchema &&
+                 !metaStore.isReadFromHiveMetaStore &&
+                 tableInfo.isTransactionalTable) {
+      metaStore.saveToDisk(tableInfo, tablePath)
       new java.util.HashMap[String, String]()
     } else {
       CarbonUtil.convertToMultiStringMap(tableInfo)
     }
     properties.foreach(e => map.put(e._1, e._2))
-    map.put("tablepath", identifier.getTablePath)
-    map.put("dbname", identifier.getDatabaseName)
+    map.put("tablepath", tablePath)
+    map.put("dbname", databaseName)
     if (map.containsKey("tableName")) {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
       LOGGER.warn("tableName is not required in options, ignoring it")
     }
-    map.put("tableName", identifier.getTableName)
+    map.put("tableName", tableName)
     map.asScala.toMap
   }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 8a37989..233f28d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -38,5 +38,5 @@
 
   override def genCode(ctx: CodegenContext): ExprCode = nonDt.genCode(ctx)
 
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy()
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index ded87b9..9f203c8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -164,6 +164,7 @@
   }
 
   private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
+    LOGGER.info("Trying to drop DataMap from system folder")
     try {
       if (dataMapSchema == null) {
         dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 130580d..b90faa7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -690,13 +690,13 @@
             attr.dataType
         }
       }
-      CarbonToSparkAdapter.createAttributeReference(attr.name,
+      CarbonToSparkAdapter.createAttributeReference(
+        attr.name,
         updatedDataType,
         attr.nullable,
         attr.metadata,
         attr.exprId,
-        attr.qualifier,
-        attr)
+        attr.qualifier)
     }
     // Only select the required columns
     var output = if (partition.nonEmpty) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index c12ff6c..d3f8079 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -97,12 +97,12 @@
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
-      // In case of spark2.2 and above and , when we call
+      // when we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of add column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) {
+      val cols = if (carbonTable.isHivePartitionTable) {
         val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
         (col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 7e66d34..cf05a9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -163,7 +163,7 @@
       if (isDataTypeChange) {
         // if column datatype change operation is on partition column, then fail the datatype change
         // operation
-        if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) {
+        if (null != carbonTable.getPartitionInfo) {
           val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
           partitionColumns.asScala.foreach {
             col =>
@@ -286,14 +286,13 @@
     // update the schema changed column at the specific index in carbonColumns based on schema order
     carbonColumns
       .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
-    // In case of spark2.2 and above and , when we call
+    // When we call
     // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
     // in case of rename column or change datatype, spark gets the catalog table and then it itself
     // adds the partition columns if the table is partition table for all the new data schema sent
     // by carbon, so there will be duplicate partition columns, so send the columns without
     // partition columns
-    val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
-                      carbonTable.isHivePartitionTable) {
+    val columns = if (carbonTable.isHivePartitionTable) {
       val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
       val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains(
         col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index bdc0228..4cb8a2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -151,13 +151,12 @@
       val cols = carbonTable.getCreateOrderColumn().asScala
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
         .filterNot(column => delCols.contains(column))
-      // In case of spark2.2 and above and , when we call
+      // When we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of drop column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
-                        carbonTable.isHivePartitionTable) {
+      val columns = if (carbonTable.isHivePartitionTable) {
         val partitionColumns = partitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
         Some(carbonColumnsWithoutPartition)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
new file mode 100644
index 0000000..f36e1ac
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
+
+/**
+ * Command to create table in case of 'USING CARBONDATA' DDL
+ *
+ * @param catalogTable catalog table created by spark
+ * @param ignoreIfExists ignore if table exists
+ * @param sparkSession spark session
+ */
+case class CarbonCreateDataSourceTableCommand(
+    catalogTable: CatalogTable,
+    ignoreIfExists: Boolean,
+    sparkSession: SparkSession)
+  extends MetadataCommand {
+
+  override def processMetadata(session: SparkSession): Seq[Row] = {
+    // Run the spark command to create table in metastore before saving carbon schema
+    // in table path.
+    // This is required for spark 2.4, because spark 2.4 will fail to create table
+    // if table path is created before hand
+    val updatedCatalogTable =
+      CarbonSource.updateCatalogTableWithCarbonSchema(catalogTable, session, None, false)
+    val sparkCommand = CreateDataSourceTableCommand(updatedCatalogTable, ignoreIfExists)
+    sparkCommand.run(session)
+
+    // save the table info (carbondata's schema) in table path
+    val tableName = catalogTable.identifier.table
+    val dbName = CarbonEnv.getDatabaseName(catalogTable.identifier.database)(session)
+    val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(session)
+    val tableInfo = CarbonSource.createTableInfo(
+      dbName, tableName, tablePath, catalogTable.schema, catalogTable.properties, None, session)
+    val metastore = CarbonEnv.getInstance(session).carbonMetaStore
+    metastore.saveToDisk(tableInfo, tablePath)
+
+    Seq.empty
+  }
+
+  override protected def opName: String = "CREATE TABLE USING CARBONDATA"
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 4adb1aa..2a5d78b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -207,8 +207,9 @@
             } catch {
               case _: Exception => // No operation
             }
+            throw e
             val msg = s"Create table'$tableName' in database '$dbName' failed"
-            throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
+            throwMetadataException(dbName, tableName, s"$msg, ${e.getMessage}")
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 54a5757..8700b29 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -134,6 +134,7 @@
       }
       val indexDatamapSchemas =
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+      LOGGER.info(s"Dropping DataMaps in table $tableName, size: " + indexDatamapSchemas.size())
       if (!indexDatamapSchemas.isEmpty) {
         childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 2e1f91f..8acc749 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -142,10 +142,13 @@
     val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
       logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
     val attrs = projectExprsNeedToDecode.map { attr =>
-      val newAttr = AttributeReference(attr.name,
+      val newAttr = CarbonToSparkAdapter.createAttributeReference(
+        attr.name,
         attr.dataType,
         attr.nullable,
-        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+        attr.metadata,
+        attr.exprId,
+        Option(table.carbonRelation.tableName))
       relation.addAttribute(newAttr)
       newAttr
     }
@@ -194,8 +197,7 @@
               attr.nullable,
               attr.metadata,
               attr.exprId,
-              attr.qualifier,
-              attr)
+              attr.qualifier)
         }
       }
       partitions =
@@ -403,7 +405,7 @@
             newProjectList :+= reference
             a.transform {
               case s: ScalaUDF =>
-                ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+                CarbonToSparkAdapter.createScalaUDF(s, reference)
             }
           case other => other
       }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index a851bc3..49a3d3b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,7 @@
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
 import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand}
 import org.apache.spark.sql.execution.command.schema._
-import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateDataSourceTableCommand, CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
@@ -171,7 +171,7 @@
             ExecutedCommandExec(addColumn) :: Nil
           }
           // TODO: remove this else if check once the 2.1 version is unsupported by carbon
-        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        } else {
           val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
             .map {
               a =>
@@ -185,8 +185,6 @@
             .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
           Nil
           // TODO: remove this else check once the 2.1 version is unsupported by carbon
-        } else {
-          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -275,9 +273,12 @@
         if table.provider.get != DDLUtils.HIVE_PROVIDER
           && (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
           || table.provider.get.equalsIgnoreCase("carbondata")) =>
-        val updatedCatalog = CarbonSource
-          .updateCatalogTableWithCarbonSchema(table, sparkSession)
-        val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
+        val cmd = if (SparkUtil.isSparkVersionEqualTo("2.4")) {
+          CarbonCreateDataSourceTableCommand(table, ignoreIfExists, sparkSession)
+        } else {
+          val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
+          CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
+        }
         ExecutedCommandExec(cmd) :: Nil
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
         if CarbonEnv.getInstance(sparkSession).carbonMetaStore
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index fd7defa..5e82eb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -26,8 +26,8 @@
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
-import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
 import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 5323293..eda131f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -78,9 +78,7 @@
             "Update operation is not supported for mv datamap table")
         }
       }
-      val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-        relation
-      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val tableRelation =
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -90,9 +88,6 @@
               Some(table.tableIdentifier))
           case _ => relation
         }
-      } else {
-        throw new UnsupportedOperationException("Unsupported Spark version.")
-      }
 
       CarbonReflectionUtils.getSubqueryAlias(
         sparkSession,
@@ -221,21 +216,15 @@
           }
         }
         // include tuple id in subquery
-        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-          Project(projList, relation)
-        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
-          alias match {
-            case Some(_) =>
-              val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
-                sparkSession,
-                alias,
-                relation,
-                Some(table.tableIdentifier))
-              Project(projList, subqueryAlias)
-            case _ => Project(projList, relation)
-          }
-        } else {
-          throw new UnsupportedOperationException("Unsupported Spark version.")
+        alias match {
+          case Some(_) =>
+            val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
+              sparkSession,
+              alias,
+              relation,
+              Some(table.tableIdentifier))
+            Project(projList, subqueryAlias)
+          case _ => Project(projList, relation)
         }
     }
     CarbonProjectForDeleteCommand(
@@ -314,13 +303,7 @@
         }
       }
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
-          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
-        } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
-        } else {
-          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
-        }
+        throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
       } else {
         Project(newChildOutput, child)
       }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
similarity index 99%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
index 8f4d45e..04beea7 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -47,4 +47,4 @@
       logicalPlan
     }
   }
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b2ba7f4..849d7ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -200,8 +200,7 @@
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
-           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
@@ -524,8 +523,7 @@
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
-           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
similarity index 68%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
index 72d3ae2..e19966f 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.CarbonDatasourceHadoopRelation
@@ -29,7 +46,7 @@
             }
             Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
 
-          case In(value, Seq(l:ListQuery)) =>
+          case In(value, Seq(l: ListQuery)) =>
             val tPlan = l.plan.transform {
               case lr: LogicalRelation
                 if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
@@ -41,4 +58,4 @@
     }
     transFormedPlan
   }
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 9b3ff87..68c293b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -18,11 +18,11 @@
 
 import java.util.LinkedHashSet
 
-import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
@@ -119,7 +119,8 @@
             val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
             CarbonMetastoreTypes.toDataType(dataType)
         }
-        AttributeReference(column.getColName, output, nullable = true )(
+        CarbonToSparkAdapter.createAttributeReference(
+          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
           qualifier = Option(tableName + "." + column.getColName))
       } else {
         val output = CarbonMetastoreTypes.toDataType {
@@ -129,7 +130,8 @@
             case others => others
           }
         }
-        AttributeReference(column.getColName, output, nullable = true)(
+        CarbonToSparkAdapter.createAttributeReference(
+          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
           qualifier = Option(tableName + "." + column.getColName))
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 20d43df..dcba730 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -26,11 +26,9 @@
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
 
 /**
- * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
+ * This interface defines those common api used by carbon for spark integration,
  * but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the
  * Concrete implementation classes.
- * For example CarbonSessionCatalog defined in 2.1 and 2.2.
- *
  */
 @InterfaceAudience.Internal
 @InterfaceStability.Stable
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
similarity index 92%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index f78c785..44b3bfd 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,12 +27,11 @@
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
@@ -61,8 +60,8 @@
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
   extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
+    SparkAdapter.getExternalCatalogCatalog(externalCatalog),
+    SparkAdapter.getGlobalTempViewManager(globalTempViewManager),
     new HiveMetastoreCatalog(sparkSession),
     functionRegistry,
     conf,
@@ -111,8 +110,9 @@
    * @return
    */
   override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
-      .asInstanceOf[HiveExternalCatalog].client
+    SparkAdapter.getHiveExternalCatalog(
+      sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+    ).client
   }
 
   override def alterAddColumns(tableIdentifier: TableIdentifier,
@@ -174,9 +174,10 @@
    * @param identifier
    * @return
    */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+  override def getPartitionsAlternate(
+      partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
     CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
   }
 
@@ -233,14 +234,14 @@
   }
 
   private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+    SparkAdapter.getHiveExternalCatalog(session.sharedState.externalCatalog)
 
   /**
    * Create a Hive aware resource loader.
    */
   override protected lazy val resourceLoader: HiveSessionResourceLoader = {
     val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
+    new HiveSessionResourceLoader(session, SparkAdapter.getHiveClient(client))
   }
 
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
@@ -274,4 +275,4 @@
   }
 
   override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
similarity index 85%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..e9bcb43 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -1,39 +1,36 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql.hive
 
-import java.util.concurrent.Callable
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.util.SparkTypeConverter
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.util.SparkTypeConverter
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 /**
  * This class refresh the relation from cache if the carbontable in
@@ -177,8 +174,7 @@
 
   def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case sa@SubqueryAlias(_,
-      MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
+      case sa@SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
         sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
       case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
         plan.asInstanceOf[LogicalRelation].copy()
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 93%
rename from integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 73b6790..36ec2c8 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -38,8 +38,8 @@
         fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
       val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
-        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
-        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
+        ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
       helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
   }
 
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
+    visitAddTableColumns(parser, ctx)
   }
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
similarity index 98%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index ee9fb0f..0335b36 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,12 +19,12 @@
 
 import java.net.URI
 
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.util.CarbonReflectionUtils
 
 /**
@@ -49,7 +48,7 @@
     Seq.empty
   }
 
-  override def processData(sparkSession: SparkSession): Seq[Row] ={
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
 
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 2765c5f..e288e6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -315,15 +315,13 @@
    * @return
    */
   def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
-    var isCompatible = true
-    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
-      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
-        .asInstanceOf[Option[Expression]]
-      if (trimStr.isDefined) {
-        isCompatible = false
-      }
+    val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+      .asInstanceOf[Option[Expression]]
+    if (trimStr.isDefined) {
+      false
+    } else {
+      true
     }
-    isCompatible
   }
 
   def transformExpression(expr: Expression): CarbonExpression = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 10b661a..4866301 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -305,7 +305,7 @@
           }
         }
         val (selectStmt, relation) =
-          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
+          if (selectPattern.findFirstIn(sel.toLowerCase).isEmpty ||
               !StringUtils.isEmpty(subQueryResults)) {
             // if subQueryResults are not empty means, it is not join with main table.
             // so use subQueryResults in update with value flow.
@@ -348,8 +348,6 @@
         rel
     }
 
-
-
   private def updateRelation(
       r: UnresolvedRelation,
       tableIdent: Seq[String],
@@ -362,8 +360,6 @@
           case Seq(dbName, tableName) => Some(tableName)
           case Seq(tableName) => Some(tableName)
         }
-        // Use Reflection to choose between Spark2.1 and Spark2.2
-        // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
         CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
     }
   }
@@ -386,8 +382,6 @@
 
         val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
 
-        // Use Reflection to choose between Spark2.1 and Spark2.2
-        // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
         val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
 
         (unresolvedRelation, tableIdent, alias, tableIdentifier)
@@ -678,6 +672,7 @@
     val name = field.column.toLowerCase
     field.copy(column = name, name = Some(name))
   }
+
   protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
     ("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index 79a6240..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-  def getPartitionKeyFilter(
-      partitionSet: AttributeSet,
-      filterPredicates: Seq[Expression]): ExpressionSet = {
-    ExpressionSet(
-      ExpressionSet(filterPredicates)
-        .filter(_.references.subsetOf(partitionSet)))
-  }
-
-  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
-      map: Map[String, String],
-      tablePath: String): CatalogStorageFormat = {
-    storageFormat.copy(properties = map, locationUri = Some(tablePath))
-  }
-
-  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
-    Seq(OptimizeCodegen(conf))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
deleted file mode 100644
index 69ed477..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation}
-import org.apache.spark.sql.types.StructType
-
-object MixedFormatHandlerUtil {
-
-  def getScanForSegments(
-      @transient relation: HadoopFsRelation,
-      output: Seq[Attribute],
-      outputSchema: StructType,
-      partitionFilters: Seq[Expression],
-      dataFilters: Seq[Expression],
-      tableIdentifier: Option[TableIdentifier]
-  ): FileSourceScanExec = {
-    val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
-    FileSourceScanExec(
-      relation,
-      output,
-      outputSchema,
-      partitionFilters,
-      pushedDownFilters,
-      tableIdentifier)
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
deleted file mode 100644
index eb3e88d..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.catalog
-
-import com.google.common.base.Objects
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
-  * A `LogicalPlan` that represents a hive table.
-  *
-  * TODO: remove this after we completely make hive as a data source.
-  */
-case class HiveTableRelation(
-                              tableMeta: CatalogTable,
-                              dataCols: Seq[AttributeReference],
-                              partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
-  assert(tableMeta.identifier.database.isDefined)
-  assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
-  assert(tableMeta.schema.sameType(dataCols.toStructType))
-
-  // The partition column should always appear after data columns.
-  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
-
-  def isPartitioned: Boolean = partitionCols.nonEmpty
-
-  override def equals(relation: Any): Boolean = relation match {
-    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    Objects.hashCode(tableMeta.identifier, output)
-  }
-
-  override def newInstance(): HiveTableRelation = copy(
-    dataCols = dataCols.map(_.newInstance()),
-    partitionCols = partitionCols.map(_.newInstance()))
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
deleted file mode 100644
index 9a88255..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
-import org.apache.spark.sql.catalyst.expressions.aggregate.First
-import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-class MigrateOptimizer {
-
-}
-
-/**
-  * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
-  */
-object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Deduplicate(keys, child, streaming) if !streaming =>
-      val keyExprIds = keys.map(_.exprId)
-      val aggCols = child.output.map { attr =>
-        if (keyExprIds.contains(attr.exprId)) {
-          attr
-        } else {
-          Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
-        }
-      }
-      Aggregate(keys, aggCols, child)
-  }
-}
-
-/** A logical plan for `dropDuplicates`. */
-case class Deduplicate(
-                        keys: Seq[Attribute],
-                        child: LogicalPlan,
-                        streaming: Boolean) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
-  * Remove projections from the query plan that do not make any modifications.
-  */
-object RemoveRedundantProject extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case p @ Project(_, child) if p.output == child.output => child
-  }
-}
-
-/**
-  * push down operations into [[CreateNamedStructLike]].
-  */
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      // push down field extraction
-      case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
-        createNamedStructLike.valExprs(ordinal)
-    }
-  }
-}
-
-/**
-  * push down operations into [[CreateArray]].
-  */
-object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      // push down field selection (array of structs)
-      case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
-        // instead f selecting the field on the entire array,
-        // select it from each member of the array.
-        // pushing down the operation this way open other optimizations opportunities
-        // (i.e. struct(...,x,...).x)
-        CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
-      // push down item selection.
-      case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
-        // instead of creating the array and then selecting one row,
-        // remove array creation altgether.
-        if (idx >= 0 && idx < elems.size) {
-          // valid index
-          elems(idx)
-        } else {
-          // out of bounds, mimic the runtime behavior and return null
-          Literal(null, ga.dataType)
-        }
-    }
-  }
-}
-
-/**
-  * push down operations into [[CreateMap]].
-  */
-object SimplifyCreateMapOps extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformExpressionsUp {
-      case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
-    }
-  }
-}
-
-
-/**
-  * Removes MapObjects when the following conditions are satisfied
-  *   1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
-  *      are primitive types with non-nullable
-  *   2. no custom collection class specified representation of data item.
-  */
-object EliminateMapObjects extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
-    case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
deleted file mode 100644
index 8eb05fc..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BAD_RECORD_PATH =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index 6b94806..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.spark.util.CarbonReflectionUtils
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    sparkSession: SparkSession,
-    functionResourceLoader: FunctionResourceLoader,
-    functionRegistry: FunctionRegistry,
-    conf: SQLConf,
-    hadoopConf: Configuration)
-  extends HiveSessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    sparkSession,
-    functionResourceLoader,
-    functionRegistry,
-    conf,
-    hadoopConf) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.init
-
-  /**
-   * This method will invalidate carbonrelation from cache if carbon table is updated in
-   * carbon catalog
-   *
-   * @param name
-   * @param alias
-   * @return
-   */
-  override def lookupRelation(name: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name, alias)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
-        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
-      super.lookupRelation(name, alias)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from session state
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
-    val partitionSchema = catalogTable.partitionSchema
-    if (partitionFilters.nonEmpty) {
-      val boundPredicate =
-        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = true)
-        })
-      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
-    } else {
-      allPartitions
-    }
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toString))
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies = extraStrategies
-
-  experimentalMethods.extraOptimizations = extraOptimizations
-
-  def extraStrategies: Seq[Strategy] = {
-    Seq(
-      new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  }
-
-  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
-    Seq(new CarbonIUDRule,
-      new CarbonUDFTransformRule,
-      new CarbonLateDecodeRule)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
-  def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
-    catalog.ParquetConversions ::
-    catalog.OrcConversions ::
-    CarbonPreInsertionCasts(sparkSession) ::
-    CarbonIUDAnalysisRule(sparkSession) ::
-    AnalyzeCreateTable(sparkSession) ::
-    PreprocessTableInsertion(conf) ::
-    DataSourceAnalysis(conf) ::
-    (if (conf.runSQLonFile) {
-      new ResolveDataSource(sparkSession) :: Nil
-    } else {  Nil })
-  }
-
-  override lazy val analyzer: Analyzer =
-    new CarbonAnalyzer(catalog, conf, sparkSession,
-      new Analyzer(catalog, conf) {
-        override val extendedResolutionRules =
-          if (extendedAnalyzerRules.nonEmpty) {
-            extendedAnalyzerRules ++ internalAnalyzerRules
-          } else {
-            internalAnalyzerRules
-          }
-        override val extendedCheckRules = Seq(
-          PreWriteCheck(conf, catalog))
-      }
-  )
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-    new CarbonHiveSessionCatalog(
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-      sparkSession.sharedState.globalTempViewManager,
-      sparkSession,
-      functionResourceLoader,
-      functionRegistry,
-      conf,
-      newHadoopConf())
-  }
-}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: CatalystConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
-  val mvPlan = try {
-    CarbonReflectionUtils.createObject(
-      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
-      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
-  } catch {
-    case e: Exception =>
-      null
-  }
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val logicalPlan = analyzer.execute(plan)
-    if (mvPlan != null) {
-      mvPlan.apply(logicalPlan)
-    } else {
-      logicalPlan
-    }
-  }
-}
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
-    super.execute(transFormedPlan)
-  }
-}
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
-    // And optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case p: PredicateSubquery =>
-            val tPlan = p.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
-        }
-    }
-    transFormedPlan
-  }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
-        ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
-        Option(ctx.STRING()).map(string),
-        ctx.AS, ctx.query, fileStorage)
-        helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateTable(ctx)
-    }
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-
-  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
-    CarbonExplainCommand(super.visitExplain(ctx))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index dd690e4..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{
-  CatalogRelation, CatalogTable, CatalogTableType,
-  SimpleCatalogRelation
-}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{
-  AlterTableRecoverPartitionsCommand, DDLUtils,
-  RunnableCommand
-}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.StructType
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- * @param table the Catalog Table
- * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
-    table: CatalogTable,
-    mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
-    assert(table.schema.isEmpty)
-
-    val provider = table.provider.get
-    val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
-
-    var createMetastoreTable = false
-    var existingSchema = Option.empty[StructType]
-    if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
-      // Check if we need to throw an exception or just return.
-      mode match {
-        case SaveMode.ErrorIfExists =>
-          throw new AnalysisException(s"Table $tableName already exists. " +
-                                      s"If you are using saveAsTable, you can set SaveMode to " +
-                                      s"SaveMode.Append to " +
-                                      s"insert data into the table or set SaveMode to SaveMode" +
-                                      s".Overwrite to overwrite" +
-                                      s"the existing data. " +
-                                      s"Or, if you are using SQL CREATE TABLE, you need to drop " +
-                                      s"$tableName first.")
-        case SaveMode.Ignore =>
-          // Since the table already exists and the save mode is Ignore, we will just return.
-          return Seq.empty[Row]
-        case SaveMode.Append =>
-          // Check if the specified data source match the data source of the existing table.
-          val existingProvider = DataSource.lookupDataSource(provider)
-          // TODO: Check that options from the resolved relation match the relation that we are
-          // inserting into (i.e. using the same compression).
-
-          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
-          // views unexpectedly.
-          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
-            case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
-              // check if the file formats match
-              l.relation match {
-                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
-                  throw new AnalysisException(
-                    s"The file format of the existing table $tableName is " +
-                    s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
-                    s"format `$provider`")
-                case _ =>
-              }
-              if (query.schema.size != l.schema.size) {
-                throw new AnalysisException(
-                  s"The column number of the existing schema[${ l.schema }] " +
-                  s"doesn't match the data schema[${ query.schema }]'s")
-              }
-              existingSchema = Some(l.schema)
-            case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-              existingSchema = Some(s.metadata.schema)
-            case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
-              throw new AnalysisException("Saving data in the Hive serde table " +
-                                          s"${ c.catalogTable.identifier } is not supported yet. " +
-                                          s"Please use the insertInto() API as an alternative..")
-            case o =>
-              throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
-          }
-        case SaveMode.Overwrite =>
-          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
-          // Need to create the table again.
-          createMetastoreTable = true
-      }
-    } else {
-      // The table does not exist. We need to create it in metastore.
-      createMetastoreTable = true
-    }
-
-    val data = Dataset.ofRows(sparkSession, query)
-    val df = existingSchema match {
-      // If we are inserting into an existing table, just use the existing schema.
-      case Some(s) => data.selectExpr(s.fieldNames: _*)
-      case None => data
-    }
-
-    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-      Some(sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.locationUri
-    }
-
-    // Create the relation based on the data of df.
-    val pathOption = tableLocation.map("path" -> _)
-    val dataSource = DataSource(
-      sparkSession,
-      className = provider,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = Some(table))
-
-    val result = try {
-      dataSource.write(mode, df)
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table $tableName in $mode mode", ex)
-        throw ex
-    }
-    result match {
-      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-                                   sparkSession.sqlContext.conf.manageFilesourcePartitions =>
-        // Need to recover partitions into the metastore so our saved data is visible.
-        sparkSession.sessionState.executePlan(
-          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
-      case _ =>
-    }
-
-    // Refresh the cache of the table in the catalog.
-    sessionState.catalog.refreshTable(tableIdentWithDB)
-    Seq.empty[Row]
-  }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
deleted file mode 100644
index 446b5a5..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-        SparkSession.sqlListener.set(null)
-      }
-    })
-  }
-
-  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier,attrRef.isGenerated)
-  }
-
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr: Option[NamedExpression] = None): Alias = {
-    val isGenerated:Boolean = if (namedExpr.isDefined) {
-      namedExpr.get.isGenerated
-    } else {
-      false
-    }
-    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation)
-  }
-
-  def getPartitionKeyFilter(
-      partitionSet: AttributeSet,
-      filterPredicates: Seq[Expression]): ExpressionSet = {
-    ExpressionSet(
-      ExpressionSet(filterPredicates)
-        .filter(_.references.subsetOf(partitionSet)))
-  }
-
-  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
-    Seq(OptimizeCodegen(conf))
-  }
-
-  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
-      map: Map[String, String],
-      tablePath: String): CatalogStorageFormat = {
-    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
deleted file mode 100644
index 7177f1a..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
-        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
-        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
-      helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateHiveTable(ctx)
-    }
-  }
-
-  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser,ctx)
-  }
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
new file mode 100644
index 0000000..d134de1
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+  extends LeafExpression with NamedExpression with CodegenFallback {
+
+  type EvaluatedType = Any
+
+  override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+  override def name: String = colExp.getColumnName
+
+  override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+  override def exprId: ExprId = throw new UnsupportedOperationException
+
+  override def qualifier: Option[String] = null
+
+  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 9094dfe..7003c26 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -23,11 +23,11 @@
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, Metadata}
 
 object CarbonToSparkAdapter {
@@ -41,8 +41,8 @@
   }
 
   def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
-                               attrRef : NamedExpression): AttributeReference = {
+      metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
     AttributeReference(
       name,
       dataType,
@@ -50,14 +50,23 @@
       metadata)(exprId, qualifier)
   }
 
-  def createAliasRef(child: Expression,
-                     name: String,
-                     exprId: ExprId = NamedExpression.newExprId,
-                     qualifier: Option[String] = None,
-                     explicitMetadata: Option[Metadata] = None,
-                     namedExpr : Option[NamedExpression] = None ) : Alias = {
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
+    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+  }
 
-      Alias(child, name)(exprId, qualifier, explicitMetadata)
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
+  ): ExprCode = {
+    ExprCode(code, isNull, value)
+  }
+
+  def createAliasRef(child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Option[String] = None,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr : Option[NamedExpression] = None ) : Alias = {
+
+    Alias(child, name)(exprId, qualifier, explicitMetadata)
   }
 
   def getExplainCommandObj() : ExplainCommand = {
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
new file mode 100644
index 0000000..bffb900
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -0,0 +1,27 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+  def getScanForSegments(
+      @transient relation: HadoopFsRelation,
+      output: Seq[Attribute],
+      outputSchema: StructType,
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression],
+      tableIdentifier: Option[TableIdentifier]
+  ): FileSourceScanExec = {
+    FileSourceScanExec(
+      relation,
+      output,
+      outputSchema,
+      partitionFilters,
+      dataFilters,
+      tableIdentifier)
+  }
+}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 5435f04..25d5543 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
similarity index 97%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
rename to integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index 36f166d..4ad3b11 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -21,21 +21,20 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
new file mode 100644
index 0000000..61959c2
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
@@ -0,0 +1,14 @@
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager}
+import org.apache.spark.sql.hive.client.HiveClient
+
+object SparkAdapter {
+  def getExternalCatalogCatalog(catalog: HiveExternalCatalog) = catalog
+
+  def getGlobalTempViewManager(manager: GlobalTempViewManager) = manager
+
+  def getHiveClient(client: HiveClient) = client
+
+  def getHiveExternalCatalog(catalog: ExternalCatalog) = catalog.asInstanceOf[HiveExternalCatalog]
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
new file mode 100644
index 0000000..ee4c9ce
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
@@ -0,0 +1,44 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+  extends LeafExpression with NamedExpression with CodegenFallback {
+
+  type EvaluatedType = Any
+
+  override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+  override def name: String = colExp.getColumnName
+
+  override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+  override def exprId: ExprId = throw new UnsupportedOperationException
+
+  override def qualifier: Seq[String] = null
+
+  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..082d1ec
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
+    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qf)
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Seq[String]): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
+    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
+  }
+
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType) = {
+    ExprCode(
+      code"$code",
+      JavaCode.isNullVariable(isNull),
+      JavaCode.variable(value, dataType))
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Seq[String] = Seq.empty,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr: Option[NamedExpression] = None) : Alias = {
+    Alias(child, name)(exprId, qualifier, explicitMetadata)
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId,
+      qualifier: Option[String]) : Alias = {
+    Alias(child, name)(exprId,
+      if (qualifier.isEmpty) Seq.empty else Seq(qualifier.get),
+      None)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation())
+  }
+
+  /**
+   * As a part of SPARK-24085 Hive tables supports scala subquery for
+   * parition tables, so Carbon also needs to supports
+   * @param partitionSet
+   * @param filterPredicates
+   * @return
+   */
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filterNot(SubqueryExpression.hasSubquery)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq.empty
+  }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
+}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 98%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index d180cd3..7b20c06 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -37,6 +38,7 @@
       output,
       outputSchema,
       partitionFilters,
+      None,
       dataFilters,
       tableIdentifier)
   }
diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
similarity index 81%
rename from integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 7605574..60ee7ea 100644
--- a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
@@ -32,7 +32,7 @@
     val rdd: RDD[InternalRow],
     @transient override val relation: HadoopFsRelation,
     val partitioning: Partitioning,
-    override val metadata: Map[String, String],
+    val md: Map[String, String],
     identifier: Option[TableIdentifier],
     @transient private val logicalRelation: LogicalRelation)
   extends FileSourceScanExec(
@@ -40,14 +40,20 @@
     output,
     relation.dataSchema,
     Seq.empty,
+    None,
     Seq.empty,
     identifier) {
 
-  override val supportsBatch: Boolean = true
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val supportsBatch: Boolean = true
 
-  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
     (partitioning, Nil)
 
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val metadata: Map[String, String] = md
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
 
 }
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
similarity index 95%
rename from integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 2f9ad3e..808099d 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -23,15 +23,14 @@
 import org.apache.spark.sql.execution.SparkOptimizer
 import org.apache.spark.sql.internal.SQLConf
 
-
 class CarbonOptimizer(
     catalog: SessionCatalog,
     conf: SQLConf,
     experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+  extends SparkOptimizer(catalog, experimentalMethods) {
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
     val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
     super.execute(transFormedPlan)
   }
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
new file mode 100644
index 0000000..8132188
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener, GlobalTempViewManager}
+import org.apache.spark.sql.hive.client.HiveClient
+
+object SparkAdapter {
+  def getExternalCatalogCatalog(catalog: HiveExternalCatalog) =
+    () => catalog
+
+  def getGlobalTempViewManager(manager: GlobalTempViewManager) =
+    () => manager
+
+  def getHiveClient(client: HiveClient) =
+    () => client
+
+  def getHiveExternalCatalog(catalog: ExternalCatalogWithListener) =
+    catalog.unwrapped.asInstanceOf[HiveExternalCatalog]
+}
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index c1a5862..e7ca31b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -24,6 +24,7 @@
   val dataMapName = "bloom_dm"
 
   override protected def beforeAll(): Unit = {
+    sqlContext.sparkContext.setLogLevel("info")
     deleteFile(bigFile)
     new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     createFile(bigFile, line = 2000)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
deleted file mode 100644
index e69de29..0000000
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ /dev/null
diff --git a/pom.xml b/pom.xml
index 66214df..f74a11b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -487,7 +487,7 @@
 
   <profiles>
     <profile>
-      <!--This profile does not build spark module, so user should explicitly give spark profile also like spark-2.1 -->
+      <!--This profile does not build spark module, so user should explicitly give spark profile -->
       <id>build-with-format</id>
       <modules>
         <module>format</module>
@@ -501,103 +501,6 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.1</id>
-      <properties>
-        <spark.version>2.1.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.eluder.coveralls</groupId>
-            <artifactId>coveralls-maven-plugin</artifactId>
-            <version>4.3.0</version>
-            <configuration>
-              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
-              <sourceEncoding>UTF-8</sourceEncoding>
-              <jacocoReports>
-                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
-                </jacocoReport>
-              </jacocoReports>
-              <sourceDirectories>
-                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
-              </sourceDirectories>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.2</id>
-      <properties>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.eluder.coveralls</groupId>
-            <artifactId>coveralls-maven-plugin</artifactId>
-            <version>4.3.0</version>
-            <configuration>
-              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
-              <sourceEncoding>UTF-8</sourceEncoding>
-              <jacocoReports>
-                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
-                </jacocoReport>
-              </jacocoReports>
-              <sourceDirectories>
-                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
-              </sourceDirectories>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -626,11 +529,64 @@
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/below2.4</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/below2.4</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+              </sourceDirectories>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.4</id>
+      <properties>
+        <spark.version>2.4.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.eluder.coveralls</groupId>
+            <artifactId>coveralls-maven-plugin</artifactId>
+            <version>4.3.0</version>
+            <configuration>
+              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+              <sourceEncoding>UTF-8</sourceEncoding>
+              <jacocoReports>
+                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+                </jacocoReport>
+              </jacocoReports>
+              <sourceDirectories>
+                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.4</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.4</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>