Revert "[CARBONDATA-3514] Support Spark 2.4.4 integration"

This reverts commit ba35a02da4f2b2ab86cdafbfe60356134187dc57.
diff --git a/README.md b/README.md
index 2f661ed..a34784d 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,8 @@
 
 
 ## Status
-Spark2.3:
-[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
+Spark2.2:
+[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
 [![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
 <a href="https://scan.coverity.com/projects/carbondata">
   <img alt="Coverity Scan Build Status"
diff --git a/build/README.md b/build/README.md
index 960ccce..f361a6e 100644
--- a/build/README.md
+++ b/build/README.md
@@ -25,9 +25,11 @@
 * [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
 
 ## Build command
-Build with different supported versions of Spark, by default using Spark 2.4.4
+Build with different supported versions of Spark, by default using Spark 2.2.1 to build
 ```
-mvn -DskipTests -Pspark-2.4 clean package
+mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
+mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
+mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
 ```
 
 Note:
@@ -37,5 +39,5 @@
 ## For contributors : To build the format code after any changes, please follow the below command.
 Note:Need install Apache Thrift 0.9.3
 ```
-mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
+mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
 ```
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index c18298d..d0e5a42 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -27,11 +27,11 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,15 +42,12 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.Logger;
 
 /**
  * Stores datamap schema in disk as json format
  */
 public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
 
-  private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
-
   private String storePath;
 
   private String mdtFilePath;
@@ -174,15 +171,17 @@
     if (!FileFactory.isFileExist(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
     }
-
-    LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
-
-    dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
+    Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
+    while (iterator.hasNext()) {
+      DataMapSchema schema = iterator.next();
+      if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
+        iterator.remove();
+      }
+    }
     touchMDTFile();
     if (!FileFactory.deleteFile(schemaPath)) {
       throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
     }
-    LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
   }
 
   private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 08841b6..b32367b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -373,8 +373,7 @@
 
   def updateColumnName(attr: Attribute, counter: Int): String = {
     val name = getUpdatedName(attr.name, counter)
-    val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
-    if (value.nonEmpty) value.head else name
+    attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
   }
 
   def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -474,7 +473,7 @@
   }
 
   def createAttrReference(ref: NamedExpression, name: String): Alias = {
-    CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
+    Alias(ref, name)(exprId = ref.exprId, qualifier = None)
   }
 
   case class AttributeKey(exp: Expression) {
@@ -538,13 +537,13 @@
         case attr: AttributeReference =>
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
-              CarbonToSparkAdapter.createAttributeReference(
-                name = a.name,
-                dataType = a.dataType,
-                nullable = a.nullable,
-                metadata = a.metadata,
-                exprId = a.exprId,
-                qualifier = attr.qualifier)
+              CarbonToSparkAdapter.createAttributeReference(a.name,
+                a.dataType,
+                a.nullable,
+                a.metadata,
+                a.exprId,
+                attr.qualifier,
+                a)
             } else {
               a
             }
@@ -576,9 +575,9 @@
     outputSel.zip(subsumerOutputList).map{ case (l, r) =>
       l match {
         case attr: AttributeReference =>
-          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
+          Alias(attr, r.name)(r.exprId, None)
         case a@Alias(attr: AttributeReference, name) =>
-          CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
+          Alias(attr, r.name)(r.exprId, None)
         case other => other
       }
     }
@@ -595,13 +594,13 @@
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
               CarbonToSparkAdapter
-                .createAttributeReference(
-                  a.name,
+                .createAttributeReference(a.name,
                   a.dataType,
                   a.nullable,
                   a.metadata,
                   a.exprId,
-                  attr.qualifier)
+                  attr.qualifier,
+                  a)
             } else {
               a
             }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index cff5c41..3ddb0fc 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -125,18 +125,17 @@
             arrayBuffer += relation
           }
           var qualifier: Option[String] = None
-          if (attr.qualifier.nonEmpty) {
-            qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
+          if (attr.qualifier.isDefined) {
+            qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
               Some(carbonTable.getTableName)
             } else {
-              attr.qualifier.headOption
+              attr.qualifier
             }
           }
           fieldToDataMapFieldMap +=
-          getFieldToDataMapFields(
-            attr.name,
+          getFieldToDataMapFields(attr.name,
             attr.dataType,
-            qualifier.headOption,
+            qualifier,
             "",
             arrayBuffer,
             carbonTable.getTableName)
@@ -249,8 +248,7 @@
   /**
    * Below method will be used to get the fields object for mv table
    */
-  private def getFieldToDataMapFields(
-      name: String,
+  private def getFieldToDataMapFields(name: String,
       dataType: DataType,
       qualifier: Option[String],
       aggregateType: String,
@@ -315,7 +313,7 @@
     val updatedOutList = outputList.map { col =>
       val duplicateColumn = duplicateNameCols
         .find(a => a.semanticEquals(col))
-      val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+      val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
       if (duplicateColumn.isDefined) {
         val attributesOfDuplicateCol = duplicateColumn.get.collect {
           case a: AttributeReference => a
@@ -331,7 +329,7 @@
           attributeOfCol.exists(a => a.semanticEquals(expr)))
         if (!isStrictDuplicate) {
           Alias(col, qualifiedName)(exprId = col.exprId)
-        } else if (col.qualifier.nonEmpty) {
+        } else if (col.qualifier.isDefined) {
           Alias(col, qualifiedName)(exprId = col.exprId)
           // this check is added in scenario where the column is direct Attribute reference and
           // since duplicate columns select is allowed, we should just put alias for those columns
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 6fbc87f..7e8eb96 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -18,12 +18,11 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
-import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.mv.datamap.MVHelper
 import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -96,12 +95,9 @@
       // Replace all compensation1 attributes with refrences of subsumer attributeset
       val compensationFinal = compensation1.transformExpressions {
         case ref: Attribute if subqueryAttributeSet.contains(ref) =>
-          CarbonToSparkAdapter.createAttributeReference(
-            ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
-            exprId = ref.exprId, qualifier = subsumerName)
+          AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
         case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
-          CarbonToSparkAdapter.createAliasRef(
-            alias.child, alias.name, alias.exprId, subsumerName)
+          Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
       }
       compensationFinal
     } else {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index 2b4247e..cb2043e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -17,11 +17,10 @@
 
 package org.apache.carbondata.mv.plans.modular
 
-import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.Metadata
+import org.apache.spark.sql.SQLConf
 
 import org.apache.carbondata.mv.plans
 import org.apache.carbondata.mv.plans._
@@ -199,18 +198,18 @@
                                                               .isInstanceOf[Attribute]))
           val aggOutputList = aggTransMap.values.flatMap(t => t._2)
             .map { ref =>
-              CarbonToSparkAdapter.createAttributeReference(
-                ref.name, ref.dataType, nullable = true, Metadata.empty,
-                ref.exprId, Some(hFactName))
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
             }
           val hFactOutputSet = hFact.outputSet
           // Update the outputlist qualifier
           val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
             attr.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                CarbonToSparkAdapter.createAttributeReference(
-                  ref.name, ref.dataType, nullable = true, Metadata.empty,
-                  ref.exprId, Some(hFactName))
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(hFactName))
             }
           }.asInstanceOf[Seq[NamedExpression]]
 
@@ -218,9 +217,9 @@
           val hPredList = s.predicateList.map{ pred =>
             pred.transform {
               case ref: Attribute if hFactOutputSet.contains(ref) =>
-                CarbonToSparkAdapter.createAttributeReference(
-                  ref.name, ref.dataType, nullable = true, Metadata.empty,
-                  ref.exprId, Some(hFactName))
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(hFactName))
             }
           }
           val hSel = s.copy(
@@ -242,9 +241,9 @@
           val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
           wip.transformExpressions {
             case ref: Attribute if hFactOutputSet.contains(ref) =>
-              CarbonToSparkAdapter.createAttributeReference(
-                ref.name, ref.dataType, nullable = true, Metadata.empty,
-                ref.exprId, Some(hFactName))
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
           }
         }
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index b694e78..30857c8 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -52,11 +52,9 @@
       plan transform {
         case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
           val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
-          val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
+          val makeupmap = children.zipWithIndex.flatMap {
             case (child, i) =>
-              aq.find(child.outputSet.contains(_))
-                .flatMap(_.qualifier.headOption)
-                .map((i, _))
+              aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
           }.toMap
           g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
       }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 7068b7e..0bbacc4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -110,7 +110,10 @@
         RewriteCorrelatedScalarSubquery,
         EliminateSerialization,
         SparkSQLUtil.getRemoveRedundantAliasesObj(),
-        RemoveRedundantProject) ++
+        RemoveRedundantProject,
+        SimplifyCreateStructOps,
+        SimplifyCreateArrayOps,
+        SimplifyCreateMapOps) ++
                                             extendedOperatorOptimizationRules: _*) ::
     Batch(
       "Check Cartesian Products", Once,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index 2033342..3b6c725 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -167,9 +167,7 @@
     val aq = attributeSet.filter(_.qualifier.nonEmpty)
     children.zipWithIndex.flatMap {
       case (child, i) =>
-        aq.find(child.outputSet.contains(_))
-          .flatMap(_.qualifier.headOption)
-          .map((i, _))
+        aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
     }.toMap
   }
 
@@ -355,13 +353,28 @@
           Seq.empty)
       case l: LogicalRelation =>
         val tableIdentifier = l.catalogTable.map(_.identifier)
-        val database = tableIdentifier.flatMap(_.database).orNull
-        val table = tableIdentifier.map(_.table).orNull
+        val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
+        val table = tableIdentifier.map(_.table).getOrElse(null)
         Some(database, table, l.output, Nil, NoFlags, Seq.empty)
       case l: LocalRelation => // used for unit test
         Some(null, null, l.output, Nil, NoFlags, Seq.empty)
       case _ =>
+        // this check is added as we get MetastoreRelation in spark2.1,
+        // this is removed in later spark version
+        // TODO: this check can be removed once 2.1 support is removed from carbon
+        if (SparkUtil.isSparkVersionEqualTo("2.1") &&
+            plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
+          val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
+            .asInstanceOf[CatalogTable]
+          Some(catalogTable.database,
+            catalogTable.identifier.table,
+            plan.output,
+            Nil,
+            NoFlags,
+            Seq.empty)
+        } else {
           None
+        }
     }
   }
 }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index 366284b..d3ce38d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -204,7 +204,7 @@
                          s.child match {
                            case a: Alias =>
                              val qualifierPrefix = a.qualifier
-                               .map(_ + ".").headOption.getOrElse("")
+                               .map(_ + ".").getOrElse("")
                              s"$qualifierPrefix${
                                quoteIdentifier(a
                                  .name)
@@ -221,7 +221,7 @@
                        s.child match {
                          case a: Alias =>
                            val qualifierPrefix = a.qualifier.map(_ + ".")
-                             .headOption.getOrElse("")
+                             .getOrElse("")
                            s"$qualifierPrefix${ quoteIdentifier(a.name) }"
 
                          case other => other.sql
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index c3a3a68..b17eea2 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -21,11 +21,8 @@
 
 import scala.collection.immutable
 
-import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.types.Metadata
-import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans._
@@ -119,19 +116,18 @@
                 if (i > -1) {
                   // this is a walk around for mystery of spark qualifier
                   if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
-                    CarbonToSparkAdapter.createAttributeReference(
-                      ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
-                      exprId = ref.exprId, qualifier = Some(aliasMap(i)))
+                    AttributeReference(
+                      ref.name,
+                      ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
                   } else {
                     ref
                   }
                 } else {
                   attrMap.get(ref) match {
                     case Some(alias) =>
-                      CarbonToSparkAdapter.createAttributeReference(
+                      AttributeReference(
                         alias.child.asInstanceOf[AttributeReference].name,
-                        ref.dataType, nullable = true, metadata = Metadata.empty,
-                        exprId = ref.exprId,
+                        ref.dataType)(exprId = ref.exprId,
                         alias.child.asInstanceOf[AttributeReference].qualifier)
                     case None => ref
                   }
@@ -182,12 +178,13 @@
                 list = list :+ ((index, subqueryName))
                 newS = newS.transformExpressions {
                   case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
-                    CarbonToSparkAdapter.createAttributeReference(
-                      ref.name, ref.dataType, nullable = true, Metadata.empty,
-                      ref.exprId, Some(subqueryName))
+                    AttributeReference(ref.name, ref.dataType)(
+                      exprId = ref.exprId,
+                      qualifier = Some(subqueryName))
                   case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                    CarbonToSparkAdapter.createAliasRef(
-                      alias.child, alias.name, alias.exprId, Some(subqueryName))
+                    Alias(alias.child, alias.name)(
+                      exprId = alias.exprId,
+                      qualifier = Some(subqueryName))
                 }
 
               case _ =>
@@ -215,12 +212,13 @@
             }
             newG.transformExpressions {
               case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
-                CarbonToSparkAdapter.createAttributeReference(
-                  ref.name, ref.dataType, nullable = true, Metadata.empty,
-                  ref.exprId, Some(subqueryName))
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(subqueryName))
               case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
-                CarbonToSparkAdapter.createAliasRef(
-                  alias.child, alias.name, alias.exprId, Some(subqueryName))
+                Alias(alias.child, alias.name)(
+                  exprId = alias.exprId,
+                  qualifier = Some(subqueryName))
             }.copy(alias = Some(subqueryName))
         }
       }
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
index bad1fc0..b1bfeeb 100644
--- a/docs/alluxio-guide.md
+++ b/docs/alluxio-guide.md
@@ -50,7 +50,7 @@
 ### Running spark-shell
  - Running the command in spark path
  ```$command
-./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
 ```
  - Testing use alluxio by CarbonSession
  ```$scala
@@ -98,7 +98,7 @@
 --master local \
 --jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
 --class org.apache.carbondata.examples.AlluxioExample \
-${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
 false
 ```
 **NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
diff --git a/docs/faq.md b/docs/faq.md
index 88ca186..16cdfa5 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -316,7 +316,7 @@
   2. Use the following command :
 
   ```
-  mvn -Pspark-2.4 -Dspark.version {yourSparkVersion} clean package
+  mvn -Pspark-2.1 -Dspark.version {yourSparkVersion} clean package
   ```
   
 Note : Refrain from using "mvn clean package" without specifying the profile.
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 0c49f35..483585f 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -237,9 +237,9 @@
   $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
   ```
   Replace the spark and hadoop version with the version used in your cluster.
-  For example, if you are using Spark 2.4.4, you would like to compile using:
+  For example, if you are using Spark 2.2.1 and Hadoop 2.7.2, you would like to compile using:
   ```
-  mvn -DskipTests -Pspark-2.4 -Dspark.version=2.4.4 -Dhadoop.version=2.7.2 clean package
+  mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 -Dhadoop.version=2.7.2 clean package
   ```
 
   Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index d007e03..b66fce4 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -37,11 +37,11 @@
     - [CLOSE STREAM](#close-stream)
 
 ## Quick example
-Download and unzip spark-2.4.4-bin-hadoop2.7.tgz, and export $SPARK_HOME
+Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
 
-Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.6.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
+Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
 ```shell
-mvn clean package -DskipTests -Pspark-2.4
+mvn clean package -DskipTests -Pspark-2.2
 ```
 
 Start a socket data server in a terminal
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index d5c1188..b6921f2 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,9 +37,9 @@
       s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
 
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
-    spark.sparkContext.setLogLevel("error")
+    spark.sparkContext.setLogLevel("INFO")
     exampleBody(spark)
     spark.close()
   }
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index de3b5bc..73bf941 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -184,6 +184,26 @@
 
     <profiles>
         <profile>
+            <id>spark-2.2</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.carbondata</groupId>
+                    <artifactId>carbondata-spark2</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>org.apache.hive</groupId>
+                            <artifactId>hive-exec</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
             <id>spark-2.3</id>
             <activation>
                 <activeByDefault>true</activeByDefault>
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 0790763..199ff84 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -191,103 +191,4 @@
     </plugins>
   </build>
 
-  <profiles>
-    <profile>
-      <id>build-all</id>
-      <properties>
-        <spark.version>2.3.4</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-    </profile>
-    <profile>
-      <id>sdvtest</id>
-      <properties>
-        <maven.test.skip>true</maven.test.skip>
-      </properties>
-    </profile>
-    <profile>
-      <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <properties>
-        <spark.version>2.3.4</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.4</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.3</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.4</id>
-      <properties>
-        <spark.version>2.4.4</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>src/main/spark2.3</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <version>3.0.0</version>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/spark2.4</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
 </project>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 2548110..f629260 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,12 +37,11 @@
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -94,9 +93,9 @@
     def closePartition(): Unit = {
       if (currentFiles.nonEmpty) {
         val newPartition =
-          CarbonToSparkAdapter.createFilePartition(
+          FilePartition(
             partitions.size,
-            currentFiles)
+            currentFiles.toArray.toSeq)
         partitions += newPartition
       }
       currentFiles.clear()
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index a46568a..13e7c45 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,22 +17,23 @@
 
 package org.apache.spark.sql.util
 
+import java.lang.reflect.Method
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateView
+import org.apache.spark.sql.catalyst.analysis.EmptyRule
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
-import org.apache.spark.sql.catalyst.optimizer.{CheckCartesianProducts, EliminateOuterJoin, NullPropagation, PullupCorrelatedPredicates, RemoveRedundantAliases, ReorderJoin}
-import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -50,60 +51,166 @@
   }
 
   def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
-    logicalPlanObj.stats
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
+      method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val method: Method = logicalPlanObj.getClass.getMethod("stats")
+      method.invoke(logicalPlanObj).asInstanceOf[Statistics]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
       : NamedExpression = {
-    QueryPlan.normalizeExprId(r, input)
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
+      clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
+        invoke(null, r, input).asInstanceOf[NamedExpression]
+    } else {
+      r
+    }
   }
 
   def getStatisticsObj(outputList: Seq[NamedExpression],
                        plan: LogicalPlan, stats: Statistics,
                        aliasMap: Option[AttributeMap[Attribute]] = None)
   : Statistics = {
-    val output = outputList.map(_.toAttribute)
-    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-      table => AttributeMap(table.output.zip(output))
+    val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val output = outputList.map(_.toAttribute)
+      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+        table => AttributeMap(table.output.zip(output))
+      }
+      val rewrites = mapSeq.head
+      val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
+        getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
+      var attributeStats = AttributeMap(attributes.iterator
+        .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+      if (aliasMap.isDefined) {
+        attributeStats = AttributeMap(
+          attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+      }
+      val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
+      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+        stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
+    } else {
+      val output = outputList.map(_.name)
+      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+        table => table.output.map(_.name).zip(output).toMap
+      }
+      val rewrites = mapSeq.head
+      val colStats = CarbonReflectionUtils.getField("colStats", stats)
+        .asInstanceOf[Map[String, ColumnStat]]
+      var attributeStats = colStats.iterator
+        .map { pair => (rewrites(pair._1), pair._2) }.toMap
+      if (aliasMap.isDefined) {
+        val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
+        attributeStats =
+          attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
+            , pair._2))
+      }
+      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+        stats.rowCount, attributeStats).asInstanceOf[Statistics]
     }
-    val rewrites = mapSeq.head
-    val attributes : AttributeMap[ColumnStat] = stats.attributeStats
-    var attributeStats = AttributeMap(attributes.iterator
-      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
-    if (aliasMap.isDefined) {
-      attributeStats = AttributeMap(
-        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
-    }
-    val hints = stats.hints
-    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, hints)
   }
 
   def getEliminateViewObj(): Rule[LogicalPlan] = {
-    EliminateView
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
   }
 
   def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
-    PullupCorrelatedPredicates
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
   }
 
   def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
-    RemoveRedundantAliases
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
   }
 
   def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    ReorderJoin
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
-    EliminateOuterJoin
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
-    NullPropagation
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
-    CheckCartesianProducts
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   /**
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 93e66ea..46692df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,22 +17,24 @@
 
 package org.apache.spark.util
 
+import java.lang.reflect.Method
+
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -58,19 +60,45 @@
   def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
     val im = rm.reflect(obj)
     im.symbol.typeSignature.members.find(_.name.toString.equals(name))
-      .map(l => im.reflectField(l.asTerm).get).orNull
+      .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
   }
 
   def getUnresolvedRelation(
       tableIdentifier: TableIdentifier,
       tableAlias: Option[String] = None): UnresolvedRelation = {
-    UnresolvedRelation(tableIdentifier)
+    val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      createObject(
+        className,
+        tableIdentifier,
+        tableAlias)._1.asInstanceOf[UnresolvedRelation]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      createObject(
+        className,
+        tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
+    } else {
+      throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
+    }
   }
 
   def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
-    SubqueryAlias(alias.getOrElse(""), relation)
+    val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      createObject(
+        className,
+        alias.getOrElse(""),
+        relation,
+        Option(view))._1.asInstanceOf[SubqueryAlias]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      createObject(
+        className,
+        alias.getOrElse(""),
+        relation)._1.asInstanceOf[SubqueryAlias]
+    } else {
+      throw new UnsupportedOperationException("Unsupported Spark version")
+    }
   }
 
   def getInsertIntoCommand(table: LogicalPlan,
@@ -78,23 +106,58 @@
       query: LogicalPlan,
       overwrite: Boolean,
       ifPartitionNotExists: Boolean): InsertIntoTable = {
-    InsertIntoTable(
-      table,
-      partition,
-      query,
-      overwrite,
-      ifPartitionNotExists)
+    val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val overwriteOptions = createObject(
+        "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
+        overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
+      createObject(
+        className,
+        table,
+        partition,
+        query,
+        overwriteOptions,
+        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
+      createObject(
+        className,
+        table,
+        partition,
+        query,
+        overwrite.asInstanceOf[Object],
+        ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+    } else {
+      throw new UnsupportedOperationException("Unsupported Spark version")
+    }
   }
 
   def getLogicalRelation(relation: BaseRelation,
       expectedOutputAttributes: Seq[Attribute],
       catalogTable: Option[CatalogTable],
       isStreaming: Boolean): LogicalRelation = {
-    new LogicalRelation(
-      relation,
-      expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
-      catalogTable,
-      isStreaming)
+    val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      createObject(
+        className,
+        relation,
+        Some(expectedOutputAttributes),
+        catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      createObject(
+        className,
+        relation,
+        expectedOutputAttributes,
+        catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      createObject(
+        className,
+        relation,
+        expectedOutputAttributes,
+        catalogTable,
+        isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
+    } else {
+      throw new UnsupportedOperationException("Unsupported Spark version")
+    }
   }
 
 
@@ -145,28 +208,46 @@
   def getSessionState(sparkContext: SparkContext,
       carbonSession: Object,
       useHiveMetaStore: Boolean): Any = {
-    if (useHiveMetaStore) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
-      val tuple = createObject(className, carbonSession, None)
-      val method = tuple._2.getMethod("build")
-      method.invoke(tuple._1)
+        "org.apache.spark.sql.hive.CarbonSessionState")
+      createObject(className, carbonSession)._1
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      if (useHiveMetaStore) {
+        val className = sparkContext.conf.get(
+          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+          "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+        val tuple = createObject(className, carbonSession, None)
+        val method = tuple._2.getMethod("build")
+        method.invoke(tuple._1)
+      } else {
+        val className = sparkContext.conf.get(
+          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+          "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+        val tuple = createObject(className, carbonSession, None)
+        val method = tuple._2.getMethod("build")
+        method.invoke(tuple._1)
+      }
     } else {
-      val className = sparkContext.conf.get(
-        CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
-      val tuple = createObject(className, carbonSession, None)
-      val method = tuple._2.getMethod("build")
-      method.invoke(tuple._1)
+      throw new UnsupportedOperationException("Spark version not supported")
     }
   }
 
   def hasPredicateSubquery(filterExp: Expression) : Boolean = {
-    val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
-    val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
-    val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
-    hasSubquery
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
+      val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
+      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+      hasSubquery
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
+      val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
+      val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+      hasSubquery
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
@@ -184,10 +265,19 @@
       rdd: RDD[InternalRow],
       partition: Partitioning,
       metadata: Map[String, String]): RowDataSourceScanExec = {
-    RowDataSourceScanExec(output, output.map(output.indexOf),
-      pushedFilters.toSet, handledFilters.toSet, rdd,
-      relation.relation,
-      relation.catalogTable.map(_.identifier))
+    val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
+    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
+      createObject(className, output, rdd, relation.relation,
+        partition, metadata,
+        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      createObject(className, output, output.map(output.indexOf),
+        pushedFilters.toSet, handledFilters.toSet, rdd,
+        relation.relation,
+        relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   def invokewriteAndReadMethod(dataSourceObj: DataSource,
@@ -197,7 +287,25 @@
       mode: SaveMode,
       query: LogicalPlan,
       physicalPlan: SparkPlan): BaseRelation = {
-    dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
+      method.invoke(dataSourceObj, mode, dataFrame)
+        .asInstanceOf[BaseRelation]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead",
+          classOf[SaveMode],
+          classOf[LogicalPlan],
+          classOf[Seq[String]],
+          classOf[SparkPlan])
+      // since spark 2.3.2 version (SPARK-PR#22346),
+      // change 'query.output' to 'query.output.map(_.name)'
+      method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
+        .asInstanceOf[BaseRelation]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 
   /**
@@ -208,7 +316,9 @@
    */
   def invokeAlterTableAddColumn(table: TableIdentifier,
       colsToAdd: Seq[StructField]): Object = {
-    AlterTableAddColumnsCommand(table, colsToAdd)
+    val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
+    CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
+      ._1.asInstanceOf[RunnableCommand]
   }
 
   def createSingleObject(className: String): Any = {
@@ -275,6 +385,16 @@
 
   def invokeAnalyzerExecute(analyzer: Analyzer,
       plan: LogicalPlan): LogicalPlan = {
-    analyzer.executeAndCheck(plan)
+    if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = analyzer.getClass
+        .getMethod("execute", classOf[LogicalPlan])
+      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      val method: Method = analyzer.getClass
+        .getMethod("executeAndCheck", classOf[LogicalPlan])
+      method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
   }
 }
diff --git a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
deleted file mode 100644
index be82907..0000000
--- a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.adapter
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
-    FilePartition(index, files.toArray)
-  }
-
-  def createAttributeReference(
-      name: String,
-      dataType: DataType,
-      nullable: Boolean,
-      metadata: Metadata,
-      exprId: ExprId,
-      qualifier: Option[String],
-      attrRef : NamedExpression = null): AttributeReference = {
-    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qf)
-  }
-
-  def createAliasRef(
-      child: Expression,
-      name: String,
-      exprId: ExprId = NamedExpression.newExprId,
-      qualifier: Option[String] = None,
-      explicitMetadata: Option[Metadata] = None,
-      namedExpr : Option[NamedExpression] = None ) : Alias = {
-    Alias(child, name)(
-      exprId,
-      if (qualifier.nonEmpty) Seq(qualifier.get) else Seq(),
-      explicitMetadata)
-  }
-}
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index 1f1cac3..cda1954 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -192,6 +192,86 @@
       </properties>
     </profile>
     <profile>
+      <id>spark-2.1</id>
+      <properties>
+        <spark.version>2.1.0</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3plus</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.1andspark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.2</id>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3plus</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.1andspark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -206,6 +286,30 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.1andspark2.2</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3plus</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
           </plugin>
         </plugins>
       </build>
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..605df66
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+  private Binary[] binaries;
+
+  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+    super(encoding);
+    binaries = new Binary[dictionary.getDictionarySize()];
+    for (int i = 0; i < binaries.length; i++) {
+      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+    }
+  }
+
+  @Override
+  public int getMaxId() {
+    return binaries.length - 1;
+  }
+
+  @Override
+  public Binary decodeToBinary(int id) {
+    return binaries[id];
+  }
+}
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..7d23d7c
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import java.lang.reflect.Field;
+import java.math.BigInteger;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.3 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+  private ColumnarBatch columnarBatch;
+  private ColumnVectorProxy[] columnVectorProxies;
+
+
+  private void updateColumnVectors() {
+    try {
+      Field field = columnarBatch.getClass().getDeclaredField("columns");
+      field.setAccessible(true);
+      field.set(columnarBatch, columnVectorProxies);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Adapter class which handles the columnar vector reading of the carbondata
+   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+   * handles the complexity of spark 2.3 version related api changes since
+   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+   *
+   * @param memMode       which represent the type onheap or offheap vector.
+   * @param outputSchema, metadata related to current schema of table.
+   * @param rowNum        rows number for vector reading
+   * @param useLazyLoad   Whether to use lazy load while getting the data.
+   */
+  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
+      boolean useLazyLoad) {
+    columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+    for (int i = 0; i < columnVectorProxies.length; i++) {
+      if (useLazyLoad) {
+        columnVectorProxies[i] =
+            new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
+      } else {
+        columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
+      }
+    }
+    updateColumnVectors();
+  }
+
+  public ColumnVectorProxy getColumnVector(int ordinal) {
+    return columnVectorProxies[ordinal];
+  }
+
+  /**
+   * Returns the number of rows for read, including filtered rows.
+   */
+  public int numRows() {
+    return columnarBatch.capacity();
+  }
+
+  /**
+   * This API will return a columnvector from a batch of column vector rows
+   * based on the ordinal
+   *
+   * @param ordinal
+   * @return
+   */
+  public ColumnVector column(int ordinal) {
+    return columnarBatch.column(ordinal);
+  }
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer accessible.
+   */
+  public void reset() {
+    for (int i = 0; i < columnarBatch.numCols(); i++) {
+      ((ColumnVectorProxy) columnarBatch.column(i)).reset();
+    }
+  }
+
+  public void resetDictionaryIds(int ordinal) {
+    (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
+  }
+
+  /**
+   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+   */
+  public InternalRow getRow(int rowId) {
+    return columnarBatch.getRow(rowId);
+  }
+
+  /**
+   * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+   */
+  public Object getColumnarBatch() {
+    return columnarBatch;
+  }
+
+  /**
+   * Called to close all the columns in this batch. It is not valid to access the data after
+   * calling this. This must be called at the end to clean up memory allocations.
+   */
+  public void close() {
+    columnarBatch.close();
+  }
+
+  /**
+   * Sets the number of rows in this batch.
+   */
+  public void setNumRows(int numRows) {
+    columnarBatch.setNumRows(numRows);
+  }
+
+  public DataType dataType(int ordinal) {
+    return columnarBatch.column(ordinal).dataType();
+  }
+
+  public static class ColumnVectorProxy extends ColumnVector {
+
+    private ColumnVector vector;
+
+    public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
+      super(capacity, columnVector.dataType(), mode);
+      try {
+        Field childColumns =
+            columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
+        childColumns.setAccessible(true);
+        Object o = childColumns.get(columnVector);
+        childColumns.set(this, o);
+        Field resultArray =
+            columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
+        resultArray.setAccessible(true);
+        Object o1 = resultArray.get(columnVector);
+        resultArray.set(this, o1);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      vector = columnVector;
+    }
+
+    public void putRowToColumnBatch(int rowId, Object value) {
+      org.apache.spark.sql.types.DataType t = dataType();
+      if (null == value) {
+        putNull(rowId);
+      } else {
+        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+          putBoolean(rowId, (boolean) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+          putByte(rowId, (byte) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+          putShort(rowId, (short) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+          putInt(rowId, (int) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+          putLong(rowId, (long) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+          putFloat(rowId, (float) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+          putDouble(rowId, (double) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+          UTF8String v = (UTF8String) value;
+          putByteArray(rowId, v.getBytes());
+        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+          DecimalType dt = (DecimalType) t;
+          Decimal d = Decimal.fromDecimal(value);
+          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+            putInt(rowId, (int) d.toUnscaledLong());
+          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+            putLong(rowId, d.toUnscaledLong());
+          } else {
+            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+            byte[] bytes = integer.toByteArray();
+            putByteArray(rowId, bytes, 0, bytes.length);
+          }
+        } else if (t instanceof CalendarIntervalType) {
+          CalendarInterval c = (CalendarInterval) value;
+          vector.getChildColumn(0).putInt(rowId, c.months);
+          vector.getChildColumn(1).putLong(rowId, c.microseconds);
+        } else if (t instanceof org.apache.spark.sql.types.DateType) {
+          putInt(rowId, (int) value);
+        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+          putLong(rowId, (long) value);
+        }
+      }
+    }
+
+    public void putBoolean(int rowId, boolean value) {
+      vector.putBoolean(rowId, value);
+    }
+
+    public void putByte(int rowId, byte value) {
+      vector.putByte(rowId, value);
+    }
+
+    public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putBytes(rowId, count, src, srcIndex);
+    }
+
+    public void putShort(int rowId, short value) {
+      vector.putShort(rowId, value);
+    }
+
+    public void putInt(int rowId, int value) {
+      vector.putInt(rowId, value);
+    }
+
+    public void putFloat(int rowId, float value) {
+      vector.putFloat(rowId, value);
+    }
+
+    public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+      vector.putFloats(rowId, count, src, srcIndex);
+    }
+
+    public void putLong(int rowId, long value) {
+      vector.putLong(rowId, value);
+    }
+
+    public void putDouble(int rowId, double value) {
+      vector.putDouble(rowId, value);
+    }
+
+    public void putInts(int rowId, int count, int value) {
+      vector.putInts(rowId, count, value);
+    }
+
+    public void putInts(int rowId, int count, int[] src, int srcIndex) {
+      vector.putInts(rowId, count, src, srcIndex);
+    }
+
+    public void putShorts(int rowId, int count, short value) {
+      vector.putShorts(rowId, count, value);
+    }
+
+    public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+      vector.putShorts(rowId, count, src, srcIndex);
+    }
+
+    public void putLongs(int rowId, int count, long value) {
+      vector.putLongs(rowId, count, value);
+    }
+
+    public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+      vector.putLongs(rowId, count, src, srcIndex);
+    }
+
+    public void putDoubles(int rowId, int count, double value) {
+      vector.putDoubles(rowId, count, value);
+    }
+
+    public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+      vector.putDoubles(rowId, count, src, srcIndex);
+    }
+
+    public DataType dataType(int ordinal) {
+      return vector.dataType();
+    }
+
+    public void putNotNull(int rowId) {
+      vector.putNotNull(rowId);
+    }
+
+    public void putNotNulls(int rowId, int count) {
+      vector.putNotNulls(rowId, count);
+    }
+
+    public void putDictionaryInt(int rowId, int value) {
+      vector.getDictionaryIds().putInt(rowId, value);
+    }
+
+    public void setDictionary(CarbonDictionary dictionary) {
+      if (null != dictionary) {
+        CarbonDictionaryWrapper dictionaryWrapper =
+            new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
+        vector.setDictionary(dictionaryWrapper);
+        this.dictionary = dictionaryWrapper;
+      } else {
+        this.dictionary = null;
+        vector.setDictionary(null);
+      }
+    }
+
+    public void putNull(int rowId) {
+      vector.putNull(rowId);
+    }
+
+    public void putNulls(int rowId, int count) {
+      vector.putNulls(rowId, count);
+    }
+
+    public boolean hasDictionary() {
+      return vector.hasDictionary();
+    }
+
+    public ColumnVector reserveDictionaryIds(int capacity) {
+      this.dictionaryIds = vector.reserveDictionaryIds(capacity);
+      return dictionaryIds;
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+      return vector.isNullAt(i);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+      return vector.getBoolean(i);
+    }
+
+    @Override
+    public byte getByte(int i) {
+      return vector.getByte(i);
+    }
+
+    @Override
+    public short getShort(int i) {
+      return vector.getShort(i);
+    }
+
+    @Override
+    public int getInt(int i) {
+      return vector.getInt(i);
+    }
+
+    @Override
+    public long getLong(int i) {
+      return vector.getLong(i);
+    }
+
+    @Override
+    public float getFloat(int i) {
+      return vector.getFloat(i);
+    }
+
+    @Override
+    public double getDouble(int i) {
+      return vector.getDouble(i);
+    }
+
+    @Override
+    protected void reserveInternal(int capacity) {
+    }
+
+    @Override
+    public void reserve(int requiredCapacity) {
+      vector.reserve(requiredCapacity);
+    }
+
+    @Override
+    public long nullsNativeAddress() {
+      return vector.nullsNativeAddress();
+    }
+
+    @Override
+    public long valuesNativeAddress() {
+      return vector.valuesNativeAddress();
+    }
+
+    @Override
+    public void putBooleans(int rowId, int count, boolean value) {
+      vector.putBooleans(rowId, count, value);
+    }
+
+    @Override
+    public void putBytes(int rowId, int count, byte value) {
+      vector.putBytes(rowId, count, value);
+    }
+
+    @Override
+    public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putIntsLittleEndian(rowId, count, src, srcIndex);
+    }
+
+    @Override
+    public int getDictId(int rowId) {
+      return vector.getDictId(rowId);
+    }
+
+    @Override
+    public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putLongsLittleEndian(rowId, count, src, srcIndex);
+    }
+
+    @Override
+    public void putFloats(int rowId, int count, float value) {
+      vector.putFloats(rowId, count, value);
+    }
+
+    @Override
+    public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putFloats(rowId, count, src, srcIndex);
+    }
+
+    @Override
+    public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+      vector.putDoubles(rowId, count, src, srcIndex);
+    }
+
+    @Override
+    public void putArray(int rowId, int offset, int length) {
+      vector.putArray(rowId, offset, length);
+    }
+
+    @Override
+    public int getArrayLength(int rowId) {
+      return vector.getArrayLength(rowId);
+    }
+
+    @Override
+    public int getArrayOffset(int rowId) {
+      return vector.getArrayOffset(rowId);
+    }
+
+    @Override
+    public void loadBytes(Array array) {
+      vector.loadBytes(array);
+    }
+
+    @Override
+    public int putByteArray(int rowId, byte[] value, int offset, int count) {
+      return vector.putByteArray(rowId, value, offset, count);
+    }
+
+    /**
+     * It keeps all binary data of all rows to it.
+     * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
+     * and offset.
+     */
+    public void putAllByteArray(byte[] data, int offset, int length) {
+      vector.arrayData().appendBytes(length, data, offset);
+    }
+
+    @Override
+    public void close() {
+      vector.close();
+    }
+
+    public void reset() {
+      if (isConstant) {
+        return;
+      }
+      vector.reset();
+    }
+
+    public void setLazyPage(LazyPageLoader lazyPage) {
+      lazyPage.loadPage();
+    }
+
+    public ColumnVector getVector() {
+      return vector;
+    }
+  }
+
+  public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
+
+    private ColumnVector vector;
+
+    private LazyPageLoader pageLoad;
+
+    private boolean isLoaded;
+
+    public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
+      super(columnVector, capacity, mode);
+      vector = columnVector;
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+      checkPageLoaded();
+      return vector.isNullAt(i);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+      checkPageLoaded();
+      return vector.getBoolean(i);
+    }
+
+    @Override
+    public byte getByte(int i) {
+      checkPageLoaded();
+      return vector.getByte(i);
+    }
+
+    @Override
+    public short getShort(int i) {
+      checkPageLoaded();
+      return vector.getShort(i);
+    }
+
+    @Override
+    public int getInt(int i) {
+      checkPageLoaded();
+      return vector.getInt(i);
+    }
+
+    @Override
+    public long getLong(int i) {
+      checkPageLoaded();
+      return vector.getLong(i);
+    }
+
+    @Override
+    public float getFloat(int i) {
+      checkPageLoaded();
+      return vector.getFloat(i);
+    }
+
+    @Override
+    public double getDouble(int i) {
+      checkPageLoaded();
+      return vector.getDouble(i);
+    }
+
+    @Override
+    public int getArrayLength(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayLength(rowId);
+    }
+
+    @Override
+    public int getArrayOffset(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayOffset(rowId);
+    }
+
+    private void checkPageLoaded() {
+      if (!isLoaded) {
+        if (pageLoad != null) {
+          pageLoad.loadPage();
+        }
+        isLoaded = true;
+      }
+    }
+
+    public void reset() {
+      if (isConstant) {
+        return;
+      }
+      isLoaded = false;
+      vector.reset();
+    }
+
+    public void setLazyPage(LazyPageLoader lazyPage) {
+      this.pageLoad = lazyPage;
+    }
+
+  }
+}
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index dfebf6f..7b65e0b 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -253,12 +253,9 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
+      <id>spark-2.1</id>
       <properties>
-        <spark.version>2.3.4</spark.version>
+        <spark.version>2.1.0</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -269,7 +266,9 @@
             <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
               <excludes>
-                <exclude>src/main/spark2.4</exclude>
+                <exclude>src/main/spark2.2</exclude>
+                <exclude>src/main/spark2.3</exclude>
+                <exclude>src/main/commonTo2.2And2.3</exclude>
               </excludes>
             </configuration>
           </plugin>
@@ -286,7 +285,8 @@
                 </goals>
                 <configuration>
                   <sources>
-                    <source>src/main/spark2.3</source>
+                    <source>src/main/spark2.1</source>
+                    <source>src/main/commonTo2.1And2.2</source>
                   </sources>
                 </configuration>
               </execution>
@@ -296,9 +296,9 @@
       </build>
     </profile>
     <profile>
-      <id>spark-2.4</id>
+      <id>spark-2.2</id>
       <properties>
-        <spark.version>2.4.4</spark.version>
+        <spark.version>2.2.1</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -309,6 +309,7 @@
             <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
               <excludes>
+                <exclude>src/main/spark2.1</exclude>
                 <exclude>src/main/spark2.3</exclude>
               </excludes>
             </configuration>
@@ -326,7 +327,55 @@
                 </goals>
                 <configuration>
                   <sources>
-                    <source>src/main/spark2.4</source>
+                    <source>src/main/spark2.2</source>
+                    <source>src/main/commonTo2.2And2.3</source>
+                    <source>src/main/commonTo2.1And2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.3.4</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.1</exclude>
+                <exclude>src/main/spark2.2</exclude>
+                <exclude>src/main/commonTo2.1And2.2</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3</source>
+                    <source>src/main/commonTo2.2And2.3</source>
                   </sources>
                 </configuration>
               </execution>
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
similarity index 81%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
rename to integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 60ee7ea..7605574 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
@@ -32,7 +32,7 @@
     val rdd: RDD[InternalRow],
     @transient override val relation: HadoopFsRelation,
     val partitioning: Partitioning,
-    val md: Map[String, String],
+    override val metadata: Map[String, String],
     identifier: Option[TableIdentifier],
     @transient private val logicalRelation: LogicalRelation)
   extends FileSourceScanExec(
@@ -40,20 +40,14 @@
     output,
     relation.dataSchema,
     Seq.empty,
-    None,
     Seq.empty,
     identifier) {
 
-  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
-  override lazy val supportsBatch: Boolean = true
+  override val supportsBatch: Boolean = true
 
-  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
-  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
     (partitioning, Nil)
 
-  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
-  override lazy val metadata: Map[String, String] = md
-
   override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
 
 }
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
similarity index 98%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
index 7b20c06..d180cd3 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -38,7 +37,6 @@
       output,
       outputSchema,
       partitionFilters,
-      None,
       dataFilters,
       tableIdentifier)
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
similarity index 99%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
index 04beea7..8f4d45e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -47,4 +47,4 @@
       logicalPlan
     }
   }
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
similarity index 97%
rename from integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index 4ad3b11..36f166d 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -21,20 +21,21 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
similarity index 68%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
index e19966f..72d3ae2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.CarbonDatasourceHadoopRelation
@@ -46,7 +29,7 @@
             }
             Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
 
-          case In(value, Seq(l: ListQuery)) =>
+          case In(value, Seq(l:ListQuery)) =>
             val tPlan = l.plan.transform {
               case lr: LogicalRelation
                 if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
@@ -58,4 +41,4 @@
     }
     transFormedPlan
   }
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
similarity index 92%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
index 44b3bfd..f78c785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,11 +27,12 @@
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
@@ -60,8 +61,8 @@
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
   extends HiveSessionCatalog (
-    SparkAdapter.getExternalCatalogCatalog(externalCatalog),
-    SparkAdapter.getGlobalTempViewManager(globalTempViewManager),
+    externalCatalog,
+    globalTempViewManager,
     new HiveMetastoreCatalog(sparkSession),
     functionRegistry,
     conf,
@@ -110,9 +111,8 @@
    * @return
    */
   override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    SparkAdapter.getHiveExternalCatalog(
-      sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
-    ).client
+    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
   }
 
   override def alterAddColumns(tableIdentifier: TableIdentifier,
@@ -174,10 +174,9 @@
    * @param identifier
    * @return
    */
-  override def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+      identifier: TableIdentifier) = {
     CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
   }
 
@@ -234,14 +233,14 @@
   }
 
   private def externalCatalog: HiveExternalCatalog =
-    SparkAdapter.getHiveExternalCatalog(session.sharedState.externalCatalog)
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
 
   /**
    * Create a Hive aware resource loader.
    */
   override protected lazy val resourceLoader: HiveSessionResourceLoader = {
     val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, SparkAdapter.getHiveClient(client))
+    new HiveSessionResourceLoader(session, client)
   }
 
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
@@ -275,4 +274,4 @@
   }
 
   override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
similarity index 85%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e9bcb43..e3f1d3f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -1,36 +1,39 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.Callable
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
-import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.SparkTypeConverter
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 /**
  * This class refresh the relation from cache if the carbontable in
@@ -174,7 +177,8 @@
 
   def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case sa@SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+      case sa@SubqueryAlias(_,
+      MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
         sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
       case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
         plan.asInstanceOf[LogicalRelation].copy()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
similarity index 100%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
similarity index 98%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 0335b36..ee9fb0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,12 +20,12 @@
 
 import java.net.URI
 
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.util.CarbonReflectionUtils
 
 /**
@@ -48,7 +49,7 @@
     Seq.empty
   }
 
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
+  override def processData(sparkSession: SparkSession): Seq[Row] ={
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
similarity index 100%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index 78d6a46..aa650e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -17,8 +17,13 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
 
 case class CastExpr(expr: Expression) extends Filter {
   override def references: Array[String] = null
@@ -28,6 +33,26 @@
   override def references: Array[String] = null
 }
 
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+  extends LeafExpression with NamedExpression with CodegenFallback {
+
+  type EvaluatedType = Any
+
+  override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+  override def name: String = colExp.getColumnName
+
+  override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+  override def exprId: ExprId = throw new UnsupportedOperationException
+
+  override def qualifier: Option[String] = null
+
+  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
+
 case class CarbonEndsWith(expr: Expression) extends Filter {
   override def references: Array[String] = null
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 23c078a..e020a99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -258,67 +258,41 @@
             s"""
                |org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
                  """.stripMargin
-            CarbonToSparkAdapter.createExprCode(
-              code,
-              s"$value.getIsNull()",
-              s"((org.apache.spark.sql.types.Decimal)$value.getValue())",
-              expr.dataType)
+            ExprCode(code, s"$value.getIsNull()",
+              s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
           } else {
             getDictionaryColumnIds(index)._3.getDataType match {
               case CarbonDataTypes.INT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((Integer)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
               case CarbonDataTypes.SHORT => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((Short)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
               case CarbonDataTypes.DOUBLE => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((Double)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
               case CarbonDataTypes.LONG => code +=
                  s"""
                     |org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((Long)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
               case CarbonDataTypes.BOOLEAN => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((Boolean)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
               case _ => code +=
                 s"""
                    |org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
                  """.stripMargin
-                CarbonToSparkAdapter.createExprCode(
-                  code,
-                  s"$value.getIsNull()",
-                  s"((UTF8String)$value.getValue())",
-                  expr.dataType)
+                ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
+
             }
           }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 703df20..376d121 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,9 +40,11 @@
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
@@ -181,9 +183,12 @@
         LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
           "Now existing Schema will be overwritten with default properties")
         val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
+        val identifier = AbsoluteTableIdentifier.from(
+          CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
+          dbName,
+          tableName)
         val updatedParams = CarbonSource.updateAndCreateTable(
-          dbName, tableName, tablePath, dataSchema, sparkSession, metaStore, parameters, None)
+          identifier, dataSchema, sparkSession, metaStore, parameters, None)
         (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -273,10 +278,9 @@
   lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
 
   def createTableInfoFromParams(
-      databaseName: String,
-      tableName: String,
       parameters: Map[String, String],
       dataSchema: StructType,
+      identifier: AbsoluteTableIdentifier,
       query: Option[LogicalPlan],
       sparkSession: SparkSession): TableModel = {
     val sqlParser = new CarbonSpark2SqlParser
@@ -297,8 +301,8 @@
         sqlParser.getFields(dataSchema)
     }
     val bucketFields = sqlParser.getBucketFields(map, fields, options)
-    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(databaseName),
-      tableName, fields, Nil, map, bucketFields)
+    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
+      identifier.getTableName, fields, Nil, map, bucketFields)
   }
 
   /**
@@ -310,8 +314,7 @@
   def updateCatalogTableWithCarbonSchema(
       tableDesc: CatalogTable,
       sparkSession: SparkSession,
-      query: Option[LogicalPlan] = None,
-      persistSchema: Boolean = true): CatalogTable = {
+      query: Option[LogicalPlan] = None): CatalogTable = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
@@ -319,16 +322,14 @@
       val tablePath = CarbonEnv.getTablePath(
         tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
       val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
       val map = updateAndCreateTable(
-        dbName,
-        tableDesc.identifier.table,
-        tablePath,
+        identifier,
         tableDesc.schema,
         sparkSession,
         metaStore,
         properties,
-        query,
-        persistSchema)
+        query)
       // updating params
       val updatedFormat = CarbonToSparkAdapter
         .getUpdatedStorageFormat(storageFormat, map, tablePath)
@@ -350,56 +351,36 @@
     }
   }
 
-  def createTableInfo(
-      databaseName: String,
-      tableName: String,
-      tablePath: String,
-      dataSchema: StructType,
-      properties: Map[String, String],
-      query: Option[LogicalPlan],
-      sparkSession: SparkSession
-  ): TableInfo = {
-    val model = createTableInfoFromParams(
-      databaseName, tableName, properties, dataSchema, query, sparkSession)
-    val tableInfo = TableNewProcessor(model)
-    val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
-    tableInfo.setTablePath(tablePath)
-    tableInfo.setTransactionalTable(isTransactionalTable)
-    tableInfo.setDatabaseName(databaseName)
-    val schemaEvolutionEntry = new SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    tableInfo
-  }
-
   def updateAndCreateTable(
-      databaseName: String,
-      tableName: String,
-      tablePath: String,
+      identifier: AbsoluteTableIdentifier,
       dataSchema: StructType,
       sparkSession: SparkSession,
       metaStore: CarbonMetaStore,
       properties: Map[String, String],
-      query: Option[LogicalPlan],
-      persistSchema: Boolean = true): Map[String, String] = {
-    val tableInfo = createTableInfo(
-      databaseName, tableName, tablePath, dataSchema, properties, query, sparkSession)
-   val map = if (persistSchema &&
-                 !metaStore.isReadFromHiveMetaStore &&
-                 tableInfo.isTransactionalTable) {
-      metaStore.saveToDisk(tableInfo, tablePath)
+      query: Option[LogicalPlan]): Map[String, String] = {
+    val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
+    val tableInfo: TableInfo = TableNewProcessor(model)
+    val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
+    tableInfo.setTablePath(identifier.getTablePath)
+    tableInfo.setTransactionalTable(isTransactionalTable)
+    tableInfo.setDatabaseName(identifier.getDatabaseName)
+    val schemaEvolutionEntry = new SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
+      metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
     } else {
       CarbonUtil.convertToMultiStringMap(tableInfo)
     }
     properties.foreach(e => map.put(e._1, e._2))
-    map.put("tablepath", tablePath)
-    map.put("dbname", databaseName)
+    map.put("tablepath", identifier.getTablePath)
+    map.put("dbname", identifier.getDatabaseName)
     if (map.containsKey("tableName")) {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
       LOGGER.warn("tableName is not required in options, ignoring it")
     }
-    map.put("tableName", tableName)
+    map.put("tableName", identifier.getTableName)
     map.asScala.toMap
   }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 233f28d..8a37989 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -38,5 +38,5 @@
 
   override def genCode(ctx: CodegenContext): ExprCode = nonDt.genCode(ctx)
 
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy()
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 9f203c8..ded87b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -164,7 +164,6 @@
   }
 
   private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
-    LOGGER.info("Trying to drop DataMap from system folder")
     try {
       if (dataMapSchema == null) {
         dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b90faa7..130580d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -690,13 +690,13 @@
             attr.dataType
         }
       }
-      CarbonToSparkAdapter.createAttributeReference(
-        attr.name,
+      CarbonToSparkAdapter.createAttributeReference(attr.name,
         updatedDataType,
         attr.nullable,
         attr.metadata,
         attr.exprId,
-        attr.qualifier)
+        attr.qualifier,
+        attr)
     }
     // Only select the required columns
     var output = if (partition.nonEmpty) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index d3f8079..c12ff6c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -97,12 +97,12 @@
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
-      // when we call
+      // In case of spark2.2 and above and , when we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of add column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val cols = if (carbonTable.isHivePartitionTable) {
+      val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) {
         val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
         (col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index cf05a9d..7e66d34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -163,7 +163,7 @@
       if (isDataTypeChange) {
         // if column datatype change operation is on partition column, then fail the datatype change
         // operation
-        if (null != carbonTable.getPartitionInfo) {
+        if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) {
           val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
           partitionColumns.asScala.foreach {
             col =>
@@ -286,13 +286,14 @@
     // update the schema changed column at the specific index in carbonColumns based on schema order
     carbonColumns
       .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
-    // When we call
+    // In case of spark2.2 and above and , when we call
     // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
     // in case of rename column or change datatype, spark gets the catalog table and then it itself
     // adds the partition columns if the table is partition table for all the new data schema sent
     // by carbon, so there will be duplicate partition columns, so send the columns without
     // partition columns
-    val columns = if (carbonTable.isHivePartitionTable) {
+    val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                      carbonTable.isHivePartitionTable) {
       val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
       val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains(
         col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 4cb8a2e..bdc0228 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -151,12 +151,13 @@
       val cols = carbonTable.getCreateOrderColumn().asScala
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
         .filterNot(column => delCols.contains(column))
-      // When we call
+      // In case of spark2.2 and above and , when we call
       // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
       // in case of drop column, spark gets the catalog table and then it itself adds the partition
       // columns if the table is partition table for all the new data schema sent by carbon,
       // so there will be duplicate partition columns, so send the columns without partition columns
-      val columns = if (carbonTable.isHivePartitionTable) {
+      val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                        carbonTable.isHivePartitionTable) {
         val partitionColumns = partitionInfo.getColumnSchemaList.asScala
         val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
         Some(carbonColumnsWithoutPartition)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
deleted file mode 100644
index f36e1ac..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.table
-
-import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
-
-/**
- * Command to create table in case of 'USING CARBONDATA' DDL
- *
- * @param catalogTable catalog table created by spark
- * @param ignoreIfExists ignore if table exists
- * @param sparkSession spark session
- */
-case class CarbonCreateDataSourceTableCommand(
-    catalogTable: CatalogTable,
-    ignoreIfExists: Boolean,
-    sparkSession: SparkSession)
-  extends MetadataCommand {
-
-  override def processMetadata(session: SparkSession): Seq[Row] = {
-    // Run the spark command to create table in metastore before saving carbon schema
-    // in table path.
-    // This is required for spark 2.4, because spark 2.4 will fail to create table
-    // if table path is created before hand
-    val updatedCatalogTable =
-      CarbonSource.updateCatalogTableWithCarbonSchema(catalogTable, session, None, false)
-    val sparkCommand = CreateDataSourceTableCommand(updatedCatalogTable, ignoreIfExists)
-    sparkCommand.run(session)
-
-    // save the table info (carbondata's schema) in table path
-    val tableName = catalogTable.identifier.table
-    val dbName = CarbonEnv.getDatabaseName(catalogTable.identifier.database)(session)
-    val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(session)
-    val tableInfo = CarbonSource.createTableInfo(
-      dbName, tableName, tablePath, catalogTable.schema, catalogTable.properties, None, session)
-    val metastore = CarbonEnv.getInstance(session).carbonMetaStore
-    metastore.saveToDisk(tableInfo, tablePath)
-
-    Seq.empty
-  }
-
-  override protected def opName: String = "CREATE TABLE USING CARBONDATA"
-
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 2a5d78b..4adb1aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -207,9 +207,8 @@
             } catch {
               case _: Exception => // No operation
             }
-            throw e
             val msg = s"Create table'$tableName' in database '$dbName' failed"
-            throwMetadataException(dbName, tableName, s"$msg, ${e.getMessage}")
+            throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8700b29..54a5757 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -134,7 +134,6 @@
       }
       val indexDatamapSchemas =
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-      LOGGER.info(s"Dropping DataMaps in table $tableName, size: " + indexDatamapSchemas.size())
       if (!indexDatamapSchemas.isEmpty) {
         childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 8acc749..2e1f91f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -142,13 +142,10 @@
     val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
       logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
     val attrs = projectExprsNeedToDecode.map { attr =>
-      val newAttr = CarbonToSparkAdapter.createAttributeReference(
-        attr.name,
+      val newAttr = AttributeReference(attr.name,
         attr.dataType,
         attr.nullable,
-        attr.metadata,
-        attr.exprId,
-        Option(table.carbonRelation.tableName))
+        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
       relation.addAttribute(newAttr)
       newAttr
     }
@@ -197,7 +194,8 @@
               attr.nullable,
               attr.metadata,
               attr.exprId,
-              attr.qualifier)
+              attr.qualifier,
+              attr)
         }
       }
       partitions =
@@ -405,7 +403,7 @@
             newProjectList :+= reference
             a.transform {
               case s: ScalaUDF =>
-                CarbonToSparkAdapter.createScalaUDF(s, reference)
+                ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
             }
           case other => other
       }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 49a3d3b..a851bc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,7 @@
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
 import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand}
 import org.apache.spark.sql.execution.command.schema._
-import org.apache.spark.sql.execution.command.table.{CarbonCreateDataSourceTableCommand, CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
@@ -171,7 +171,7 @@
             ExecutedCommandExec(addColumn) :: Nil
           }
           // TODO: remove this else if check once the 2.1 version is unsupported by carbon
-        } else {
+        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
           val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
             .map {
               a =>
@@ -185,6 +185,8 @@
             .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
           Nil
           // TODO: remove this else check once the 2.1 version is unsupported by carbon
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -273,12 +275,9 @@
         if table.provider.get != DDLUtils.HIVE_PROVIDER
           && (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
           || table.provider.get.equalsIgnoreCase("carbondata")) =>
-        val cmd = if (SparkUtil.isSparkVersionEqualTo("2.4")) {
-          CarbonCreateDataSourceTableCommand(table, ignoreIfExists, sparkSession)
-        } else {
-          val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
-          CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
-        }
+        val updatedCatalog = CarbonSource
+          .updateCatalogTableWithCarbonSchema(table, sparkSession)
+        val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
         ExecutedCommandExec(cmd) :: Nil
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
         if CarbonEnv.getInstance(sparkSession).carbonMetaStore
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 5e82eb3..fd7defa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -26,8 +26,8 @@
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
 import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index eda131f..5323293 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -78,7 +78,9 @@
             "Update operation is not supported for mv datamap table")
         }
       }
-      val tableRelation =
+      val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        relation
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         alias match {
           case Some(_) =>
             CarbonReflectionUtils.getSubqueryAlias(
@@ -88,6 +90,9 @@
               Some(table.tableIdentifier))
           case _ => relation
         }
+      } else {
+        throw new UnsupportedOperationException("Unsupported Spark version.")
+      }
 
       CarbonReflectionUtils.getSubqueryAlias(
         sparkSession,
@@ -216,15 +221,21 @@
           }
         }
         // include tuple id in subquery
-        alias match {
-          case Some(_) =>
-            val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
-              sparkSession,
-              alias,
-              relation,
-              Some(table.tableIdentifier))
-            Project(projList, subqueryAlias)
-          case _ => Project(projList, relation)
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+          Project(projList, relation)
+        } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+          alias match {
+            case Some(_) =>
+              val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
+                sparkSession,
+                alias,
+                relation,
+                Some(table.tableIdentifier))
+              Project(projList, subqueryAlias)
+            case _ => Project(projList, relation)
+          }
+        } else {
+          throw new UnsupportedOperationException("Unsupported Spark version.")
         }
     }
     CarbonProjectForDeleteCommand(
@@ -303,7 +314,13 @@
         }
       }
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
-        throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
+        } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
+        } else {
+          throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
+        }
       } else {
         Project(newChildOutput, child)
       }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 849d7ba..b2ba7f4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -200,7 +200,8 @@
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
       case SubqueryAlias(_, c)
-        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
@@ -523,7 +524,8 @@
       carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDataSourceHadoopRelation
       case SubqueryAlias(_, c)
-        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName
               .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 68c293b..9b3ff87 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -18,11 +18,11 @@
 
 import java.util.LinkedHashSet
 
+import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
@@ -119,8 +119,7 @@
             val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
             CarbonMetastoreTypes.toDataType(dataType)
         }
-        CarbonToSparkAdapter.createAttributeReference(
-          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
+        AttributeReference(column.getColName, output, nullable = true )(
           qualifier = Option(tableName + "." + column.getColName))
       } else {
         val output = CarbonMetastoreTypes.toDataType {
@@ -130,8 +129,7 @@
             case others => others
           }
         }
-        CarbonToSparkAdapter.createAttributeReference(
-          column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
+        AttributeReference(column.getColName, output, nullable = true)(
           qualifier = Option(tableName + "." + column.getColName))
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index dcba730..20d43df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -26,9 +26,11 @@
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
 
 /**
- * This interface defines those common api used by carbon for spark integration,
+ * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
  * but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the
  * Concrete implementation classes.
+ * For example CarbonSessionCatalog defined in 2.1 and 2.2.
+ *
  */
 @InterfaceAudience.Internal
 @InterfaceStability.Stable
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e288e6d..2765c5f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -315,13 +315,15 @@
    * @return
    */
   def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
-    val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
-      .asInstanceOf[Option[Expression]]
-    if (trimStr.isDefined) {
-      false
-    } else {
-      true
+    var isCompatible = true
+    if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+        .asInstanceOf[Option[Expression]]
+      if (trimStr.isDefined) {
+        isCompatible = false
+      }
     }
+    isCompatible
   }
 
   def transformExpression(expr: Expression): CarbonExpression = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4866301..10b661a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -305,7 +305,7 @@
           }
         }
         val (selectStmt, relation) =
-          if (selectPattern.findFirstIn(sel.toLowerCase).isEmpty ||
+          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
               !StringUtils.isEmpty(subQueryResults)) {
             // if subQueryResults are not empty means, it is not join with main table.
             // so use subQueryResults in update with value flow.
@@ -348,6 +348,8 @@
         rel
     }
 
+
+
   private def updateRelation(
       r: UnresolvedRelation,
       tableIdent: Seq[String],
@@ -360,6 +362,8 @@
           case Seq(dbName, tableName) => Some(tableName)
           case Seq(tableName) => Some(tableName)
         }
+        // Use Reflection to choose between Spark2.1 and Spark2.2
+        // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
         CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
     }
   }
@@ -382,6 +386,8 @@
 
         val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
 
+        // Use Reflection to choose between Spark2.1 and Spark2.2
+        // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
         val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
 
         (unresolvedRelation, tableIdent, alias, tableIdentifier)
@@ -672,7 +678,6 @@
     val name = field.column.toLowerCase
     field.copy(column = name, name = Some(name))
   }
-
   protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
     ("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..79a6240
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(tablePath))
+  }
+
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq(OptimizeCodegen(conf))
+  }
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 88%
copy from integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
copy to integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index 7b20c06..69ed477 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 
 object MixedFormatHandlerUtil {
@@ -33,13 +32,13 @@
       dataFilters: Seq[Expression],
       tableIdentifier: Option[TableIdentifier]
   ): FileSourceScanExec = {
+    val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
     FileSourceScanExec(
       relation,
       output,
       outputSchema,
       partitionFilters,
-      None,
-      dataFilters,
+      pushedDownFilters,
       tableIdentifier)
   }
 }
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
new file mode 100644
index 0000000..eb3e88d
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import com.google.common.base.Objects
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * A `LogicalPlan` that represents a hive table.
+  *
+  * TODO: remove this after we completely make hive as a data source.
+  */
+case class HiveTableRelation(
+                              tableMeta: CatalogTable,
+                              dataCols: Seq[AttributeReference],
+                              partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
+  assert(tableMeta.identifier.database.isDefined)
+  assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+  assert(tableMeta.schema.sameType(dataCols.toStructType))
+
+  // The partition column should always appear after data columns.
+  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
+
+  def isPartitioned: Boolean = partitionCols.nonEmpty
+
+  override def equals(relation: Any): Boolean = relation match {
+    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(tableMeta.identifier, output)
+  }
+
+  override def newInstance(): HiveTableRelation = copy(
+    dataCols = dataCols.map(_.newInstance()),
+    partitionCols = partitionCols.map(_.newInstance()))
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
new file mode 100644
index 0000000..9a88255
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class MigrateOptimizer {
+
+}
+
+/**
+  * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
+  */
+object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Deduplicate(keys, child, streaming) if !streaming =>
+      val keyExprIds = keys.map(_.exprId)
+      val aggCols = child.output.map { attr =>
+        if (keyExprIds.contains(attr.exprId)) {
+          attr
+        } else {
+          Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
+        }
+      }
+      Aggregate(keys, aggCols, child)
+  }
+}
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplicate(
+                        keys: Seq[Attribute],
+                        child: LogicalPlan,
+                        streaming: Boolean) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+  * Remove projections from the query plan that do not make any modifications.
+  */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case p @ Project(_, child) if p.output == child.output => child
+  }
+}
+
+/**
+  * push down operations into [[CreateNamedStructLike]].
+  */
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      // push down field extraction
+      case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
+        createNamedStructLike.valExprs(ordinal)
+    }
+  }
+}
+
+/**
+  * push down operations into [[CreateArray]].
+  */
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      // push down field selection (array of structs)
+      case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
+        // instead f selecting the field on the entire array,
+        // select it from each member of the array.
+        // pushing down the operation this way open other optimizations opportunities
+        // (i.e. struct(...,x,...).x)
+        CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
+      // push down item selection.
+      case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+        // instead of creating the array and then selecting one row,
+        // remove array creation altgether.
+        if (idx >= 0 && idx < elems.size) {
+          // valid index
+          elems(idx)
+        } else {
+          // out of bounds, mimic the runtime behavior and return null
+          Literal(null, ga.dataType)
+        }
+    }
+  }
+}
+
+/**
+  * push down operations into [[CreateMap]].
+  */
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
+    }
+  }
+}
+
+
+/**
+  * Removes MapObjects when the following conditions are satisfied
+  *   1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
+  *      are primitive types with non-nullable
+  *   2. no custom collection class specified representation of data item.
+  */
+object EliminateMapObjects extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
+  }
+}
diff --git a/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
similarity index 65%
rename from integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
rename to integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
index a2ca9e6..4abf189 100644
--- a/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.spark.adapter
+package org.apache.spark.sql.catalyst.plans.logical
 
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.sql.catalyst.expressions.Attribute
 
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
-
-object CarbonToSparkAdapter {
-  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
-    FilePartition(index, files.toArray.toSeq)
-  }
-}
+/**
+  * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
+  * recognize a subquery as such, and it allows us to write subquery aware transformations.
+  */
+case class Subquery(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
new file mode 100644
index 0000000..8eb05fc
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To set carbon task distribution.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    val BAD_RECORDS_LOGGER_ENABLE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BAD_RECORD_PATH =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..6b94806
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.spark.util.CarbonReflectionUtils
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    sparkSession: SparkSession,
+    functionResourceLoader: FunctionResourceLoader,
+    functionRegistry: FunctionRegistry,
+    conf: SQLConf,
+    hadoopConf: Configuration)
+  extends HiveSessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    sparkSession,
+    functionResourceLoader,
+    functionRegistry,
+    conf,
+    hadoopConf) with CarbonSessionCatalog {
+
+  private lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.init
+
+  /**
+   * This method will invalidate carbonrelation from cache if carbon table is updated in
+   * carbon catalog
+   *
+   * @param name
+   * @param alias
+   * @return
+   */
+  override def lookupRelation(name: TableIdentifier,
+      alias: Option[String]): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name, alias)
+    var toRefreshRelation = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+      case _ =>
+    }
+
+    if (toRefreshRelation) {
+      super.lookupRelation(name, alias)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from session state
+   *
+   * @return
+   */
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(
+      partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
+    val partitionSchema = catalogTable.partitionSchema
+    if (partitionFilters.nonEmpty) {
+      val boundPredicate =
+        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
+          case att: AttributeReference =>
+            val index = partitionSchema.indexWhere(_.name == att.name)
+            BoundReference(index, partitionSchema(index).dataType, nullable = true)
+        })
+      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
+    } else {
+      allPartitions
+    }
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toString))
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies = extraStrategies
+
+  experimentalMethods.extraOptimizations = extraOptimizations
+
+  def extraStrategies: Seq[Strategy] = {
+    Seq(
+      new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  }
+
+  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+    Seq(new CarbonIUDRule,
+      new CarbonUDFTransformRule,
+      new CarbonLateDecodeRule)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
+  def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
+    catalog.ParquetConversions ::
+    catalog.OrcConversions ::
+    CarbonPreInsertionCasts(sparkSession) ::
+    CarbonIUDAnalysisRule(sparkSession) ::
+    AnalyzeCreateTable(sparkSession) ::
+    PreprocessTableInsertion(conf) ::
+    DataSourceAnalysis(conf) ::
+    (if (conf.runSQLonFile) {
+      new ResolveDataSource(sparkSession) :: Nil
+    } else {  Nil })
+  }
+
+  override lazy val analyzer: Analyzer =
+    new CarbonAnalyzer(catalog, conf, sparkSession,
+      new Analyzer(catalog, conf) {
+        override val extendedResolutionRules =
+          if (extendedAnalyzerRules.nonEmpty) {
+            extendedAnalyzerRules ++ internalAnalyzerRules
+          } else {
+            internalAnalyzerRules
+          }
+        override val extendedCheckRules = Seq(
+          PreWriteCheck(conf, catalog))
+      }
+  )
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override lazy val catalog = {
+    new CarbonHiveSessionCatalog(
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+      sparkSession.sharedState.globalTempViewManager,
+      sparkSession,
+      functionResourceLoader,
+      functionRegistry,
+      conf,
+      newHadoopConf())
+  }
+}
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: CatalystConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+  val mvPlan = try {
+    CarbonReflectionUtils.createObject(
+      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+  } catch {
+    case e: Exception =>
+      null
+  }
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val logicalPlan = analyzer.execute(plan)
+    if (mvPlan != null) {
+      mvPlan.apply(logicalPlan)
+    } else {
+      logicalPlan
+    }
+  }
+}
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
+    // And optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case p: PredicateSubquery =>
+            val tPlan = p.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+        }
+    }
+    transFormedPlan
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("carbondata") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
+        ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
+        Option(ctx.STRING()).map(string),
+        ctx.AS, ctx.query, fileStorage)
+        helper.createCarbonTable(createTableTuple)
+    } else {
+      super.visitCreateTable(ctx)
+    }
+  }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+    withOrigin(ctx) {
+      if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+        super.visitShowTables(ctx)
+      } else {
+        CarbonShowTablesCommand(
+          Option(ctx.db).map(_.getText),
+          Option(ctx.pattern).map(string))
+      }
+    }
+  }
+
+  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
+    CarbonExplainCommand(super.visitExplain(ctx))
+  }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..dd690e4
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,165 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{
+  CatalogRelation, CatalogTable, CatalogTableType,
+  SimpleCatalogRelation
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{
+  AlterTableRecoverPartitionsCommand, DDLUtils,
+  RunnableCommand
+}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+    assert(table.schema.isEmpty)
+
+    val provider = table.provider.get
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    var createMetastoreTable = false
+    var existingSchema = Option.empty[StructType]
+    if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+      // Check if we need to throw an exception or just return.
+      mode match {
+        case SaveMode.ErrorIfExists =>
+          throw new AnalysisException(s"Table $tableName already exists. " +
+                                      s"If you are using saveAsTable, you can set SaveMode to " +
+                                      s"SaveMode.Append to " +
+                                      s"insert data into the table or set SaveMode to SaveMode" +
+                                      s".Overwrite to overwrite" +
+                                      s"the existing data. " +
+                                      s"Or, if you are using SQL CREATE TABLE, you need to drop " +
+                                      s"$tableName first.")
+        case SaveMode.Ignore =>
+          // Since the table already exists and the save mode is Ignore, we will just return.
+          return Seq.empty[Row]
+        case SaveMode.Append =>
+          // Check if the specified data source match the data source of the existing table.
+          val existingProvider = DataSource.lookupDataSource(provider)
+          // TODO: Check that options from the resolved relation match the relation that we are
+          // inserting into (i.e. using the same compression).
+
+          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+          // views unexpectedly.
+          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+            case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+              // check if the file formats match
+              l.relation match {
+                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
+                  throw new AnalysisException(
+                    s"The file format of the existing table $tableName is " +
+                    s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
+                    s"format `$provider`")
+                case _ =>
+              }
+              if (query.schema.size != l.schema.size) {
+                throw new AnalysisException(
+                  s"The column number of the existing schema[${ l.schema }] " +
+                  s"doesn't match the data schema[${ query.schema }]'s")
+              }
+              existingSchema = Some(l.schema)
+            case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+              existingSchema = Some(s.metadata.schema)
+            case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
+              throw new AnalysisException("Saving data in the Hive serde table " +
+                                          s"${ c.catalogTable.identifier } is not supported yet. " +
+                                          s"Please use the insertInto() API as an alternative..")
+            case o =>
+              throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
+          }
+        case SaveMode.Overwrite =>
+          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+          // Need to create the table again.
+          createMetastoreTable = true
+      }
+    } else {
+      // The table does not exist. We need to create it in metastore.
+      createMetastoreTable = true
+    }
+
+    val data = Dataset.ofRows(sparkSession, query)
+    val df = existingSchema match {
+      // If we are inserting into an existing table, just use the existing schema.
+      case Some(s) => data.selectExpr(s.fieldNames: _*)
+      case None => data
+    }
+
+    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+      Some(sessionState.catalog.defaultTablePath(table.identifier))
+    } else {
+      table.storage.locationUri
+    }
+
+    // Create the relation based on the data of df.
+    val pathOption = tableLocation.map("path" -> _)
+    val dataSource = DataSource(
+      sparkSession,
+      className = provider,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = Some(table))
+
+    val result = try {
+      dataSource.write(mode, df)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table $tableName in $mode mode", ex)
+        throw ex
+    }
+    result match {
+      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                   sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+        // Need to recover partitions into the metastore so our saved data is visible.
+        sparkSession.sessionState.executePlan(
+          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+      case _ =>
+    }
+
+    // Refresh the cache of the table in the catalog.
+    sessionState.catalog.refreshTable(tableIdentWithDB)
+    Seq.empty[Row]
+  }
+}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..446b5a5
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+        SparkSession.sqlListener.set(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier,attrRef.isGenerated)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr: Option[NamedExpression] = None): Alias = {
+    val isGenerated:Boolean = if (namedExpr.isDefined) {
+      namedExpr.get.isGenerated
+    } else {
+      false
+    }
+    Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation)
+  }
+
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq(OptimizeCodegen(conf))
+  }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
similarity index 95%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
rename to integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 808099d..2f9ad3e 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -23,14 +23,15 @@
 import org.apache.spark.sql.execution.SparkOptimizer
 import org.apache.spark.sql.internal.SQLConf
 
+
 class CarbonOptimizer(
     catalog: SessionCatalog,
     conf: SQLConf,
     experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, experimentalMethods) {
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
     val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
     super.execute(transFormedPlan)
   }
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 89%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
copy to integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 36ec2c8..7177f1a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -31,15 +31,15 @@
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat(0))
+    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
         fileStorage.equalsIgnoreCase("carbondata") ||
         fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
-        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
-        ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
       helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
   }
 
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser, ctx)
+    visitAddTableColumns(parser,ctx)
   }
 }
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index d134de1..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
-  extends LeafExpression with NamedExpression with CodegenFallback {
-
-  type EvaluatedType = Any
-
-  override def toString: String = s"input[" + colExp.getColIndex + "]"
-
-  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
-  override def name: String = colExp.getColumnName
-
-  override def toAttribute: Attribute = throw new UnsupportedOperationException
-
-  override def exprId: ExprId = throw new UnsupportedOperationException
-
-  override def qualifier: Option[String] = null
-
-  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 7003c26..9094dfe 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -23,11 +23,11 @@
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, Metadata}
 
 object CarbonToSparkAdapter {
@@ -41,8 +41,8 @@
   }
 
   def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
-      metadata: Metadata, exprId: ExprId, qualifier: Option[String],
-      attrRef : NamedExpression = null): AttributeReference = {
+                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
     AttributeReference(
       name,
       dataType,
@@ -50,23 +50,14 @@
       metadata)(exprId, qualifier)
   }
 
-  def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
-    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
-  }
-
-  def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
-  ): ExprCode = {
-    ExprCode(code, isNull, value)
-  }
-
   def createAliasRef(child: Expression,
-      name: String,
-      exprId: ExprId = NamedExpression.newExprId,
-      qualifier: Option[String] = None,
-      explicitMetadata: Option[Metadata] = None,
-      namedExpr : Option[NamedExpression] = None ) : Alias = {
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr : Option[NamedExpression] = None ) : Alias = {
 
-    Alias(child, name)(exprId, qualifier, explicitMetadata)
+      Alias(child, name)(exprId, qualifier, explicitMetadata)
   }
 
   def getExplainCommandObj() : ExplainCommand = {
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
deleted file mode 100644
index bffb900..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.types.StructType
-
-object MixedFormatHandlerUtil {
-
-  def getScanForSegments(
-      @transient relation: HadoopFsRelation,
-      output: Seq[Attribute],
-      outputSchema: StructType,
-      partitionFilters: Seq[Expression],
-      dataFilters: Seq[Expression],
-      tableIdentifier: Option[TableIdentifier]
-  ): FileSourceScanExec = {
-    FileSourceScanExec(
-      relation,
-      output,
-      outputSchema,
-      partitionFilters,
-      dataFilters,
-      tableIdentifier)
-  }
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 25d5543..5435f04 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 93%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
rename to integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 36ec2c8..73b6790 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -38,8 +38,8 @@
         fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
       val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
-        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
-        ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
+        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
       helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
   }
 
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    visitAddTableColumns(parser, ctx)
+    visitAddTableColumns(parser,ctx)
   }
 }
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
deleted file mode 100644
index 61959c2..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager}
-import org.apache.spark.sql.hive.client.HiveClient
-
-object SparkAdapter {
-  def getExternalCatalogCatalog(catalog: HiveExternalCatalog) = catalog
-
-  def getGlobalTempViewManager(manager: GlobalTempViewManager) = manager
-
-  def getHiveClient(client: HiveClient) = client
-
-  def getHiveExternalCatalog(catalog: ExternalCatalog) = catalog.asInstanceOf[HiveExternalCatalog]
-}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index ee4c9ce..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
-  extends LeafExpression with NamedExpression with CodegenFallback {
-
-  type EvaluatedType = Any
-
-  override def toString: String = s"input[" + colExp.getColIndex + "]"
-
-  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
-  override def name: String = colExp.getColumnName
-
-  override def toAttribute: Attribute = throw new UnsupportedOperationException
-
-  override def exprId: ExprId = throw new UnsupportedOperationException
-
-  override def qualifier: Seq[String] = null
-
-  override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
deleted file mode 100644
index 082d1ec..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
-  def addSparkListener(sparkContext: SparkContext) = {
-    sparkContext.addSparkListener(new SparkListener {
-      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-        SparkSession.setDefaultSession(null)
-      }
-    })
-  }
-
-  def createAttributeReference(
-      name: String,
-      dataType: DataType,
-      nullable: Boolean,
-      metadata: Metadata,
-      exprId: ExprId,
-      qualifier: Option[String],
-      attrRef : NamedExpression = null): AttributeReference = {
-    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qf)
-  }
-
-  def createAttributeReference(
-      name: String,
-      dataType: DataType,
-      nullable: Boolean,
-      metadata: Metadata,
-      exprId: ExprId,
-      qualifier: Seq[String]): AttributeReference = {
-    AttributeReference(
-      name,
-      dataType,
-      nullable,
-      metadata)(exprId, qualifier)
-  }
-
-  def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
-    ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
-  }
-
-  def createExprCode(code: String, isNull: String, value: String, dataType: DataType) = {
-    ExprCode(
-      code"$code",
-      JavaCode.isNullVariable(isNull),
-      JavaCode.variable(value, dataType))
-  }
-
-  def createAliasRef(
-      child: Expression,
-      name: String,
-      exprId: ExprId = NamedExpression.newExprId,
-      qualifier: Seq[String] = Seq.empty,
-      explicitMetadata: Option[Metadata] = None,
-      namedExpr: Option[NamedExpression] = None) : Alias = {
-    Alias(child, name)(exprId, qualifier, explicitMetadata)
-  }
-
-  def createAliasRef(
-      child: Expression,
-      name: String,
-      exprId: ExprId,
-      qualifier: Option[String]) : Alias = {
-    Alias(child, name)(exprId,
-      if (qualifier.isEmpty) Seq.empty else Seq(qualifier.get),
-      None)
-  }
-
-  def getExplainCommandObj() : ExplainCommand = {
-    ExplainCommand(OneRowRelation())
-  }
-
-  /**
-   * As a part of SPARK-24085 Hive tables supports scala subquery for
-   * parition tables, so Carbon also needs to supports
-   * @param partitionSet
-   * @param filterPredicates
-   * @return
-   */
-  def getPartitionKeyFilter(
-      partitionSet: AttributeSet,
-      filterPredicates: Seq[Expression]): ExpressionSet = {
-    ExpressionSet(
-      ExpressionSet(filterPredicates)
-        .filterNot(SubqueryExpression.hasSubquery)
-        .filter(_.references.subsetOf(partitionSet)))
-  }
-
-  // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
-  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
-    Seq.empty
-  }
-
-  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
-      map: Map[String, String],
-      tablePath: String): CatalogStorageFormat = {
-    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
-  }
-}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
deleted file mode 100644
index 8132188..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener, GlobalTempViewManager}
-import org.apache.spark.sql.hive.client.HiveClient
-
-object SparkAdapter {
-  def getExternalCatalogCatalog(catalog: HiveExternalCatalog) =
-    () => catalog
-
-  def getGlobalTempViewManager(manager: GlobalTempViewManager) =
-    () => manager
-
-  def getHiveClient(client: HiveClient) =
-    () => client
-
-  def getHiveExternalCatalog(catalog: ExternalCatalogWithListener) =
-    catalog.unwrapped.asInstanceOf[HiveExternalCatalog]
-}
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index e7ca31b..c1a5862 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -24,7 +24,6 @@
   val dataMapName = "bloom_dm"
 
   override protected def beforeAll(): Unit = {
-    sqlContext.sparkContext.setLogLevel("info")
     deleteFile(bigFile)
     new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     createFile(bigFile, line = 2000)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
diff --git a/pom.xml b/pom.xml
index f74a11b..66214df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -487,7 +487,7 @@
 
   <profiles>
     <profile>
-      <!--This profile does not build spark module, so user should explicitly give spark profile -->
+      <!--This profile does not build spark module, so user should explicitly give spark profile also like spark-2.1 -->
       <id>build-with-format</id>
       <modules>
         <module>format</module>
@@ -501,6 +501,103 @@
       </properties>
     </profile>
     <profile>
+      <id>spark-2.1</id>
+      <properties>
+        <spark.version>2.1.0</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.eluder.coveralls</groupId>
+            <artifactId>coveralls-maven-plugin</artifactId>
+            <version>4.3.0</version>
+            <configuration>
+              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+              <sourceEncoding>UTF-8</sourceEncoding>
+              <jacocoReports>
+                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+                </jacocoReport>
+              </jacocoReports>
+              <sourceDirectories>
+                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+              </sourceDirectories>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.2</id>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.eluder.coveralls</groupId>
+            <artifactId>coveralls-maven-plugin</artifactId>
+            <version>4.3.0</version>
+            <configuration>
+              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+              <sourceEncoding>UTF-8</sourceEncoding>
+              <jacocoReports>
+                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+                </jacocoReport>
+              </jacocoReports>
+              <sourceDirectories>
+                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+              </sourceDirectories>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
       <id>spark-2.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -529,64 +626,11 @@
                 <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/below2.4</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/below2.4</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
-              </sourceDirectories>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>spark-2.4</id>
-      <properties>
-        <spark.version>2.4.4</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.8</scala.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.eluder.coveralls</groupId>
-            <artifactId>coveralls-maven-plugin</artifactId>
-            <version>4.3.0</version>
-            <configuration>
-              <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
-              <sourceEncoding>UTF-8</sourceEncoding>
-              <jacocoReports>
-                <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
-                </jacocoReport>
-              </jacocoReports>
-              <sourceDirectories>
-                <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.4</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.4</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>