Revert "[CARBONDATA-3514] Support Spark 2.4.4 integration"
This reverts commit ba35a02da4f2b2ab86cdafbfe60356134187dc57.
diff --git a/README.md b/README.md
index 2f661ed..a34784d 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,8 @@
## Status
-Spark2.3:
-[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
+Spark2.2:
+[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
[![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
<a href="https://scan.coverity.com/projects/carbondata">
<img alt="Coverity Scan Build Status"
diff --git a/build/README.md b/build/README.md
index 960ccce..f361a6e 100644
--- a/build/README.md
+++ b/build/README.md
@@ -25,9 +25,11 @@
* [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
## Build command
-Build with different supported versions of Spark, by default using Spark 2.4.4
+Build with different supported versions of Spark, by default using Spark 2.2.1 to build
```
-mvn -DskipTests -Pspark-2.4 clean package
+mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
+mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
+mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
```
Note:
@@ -37,5 +39,5 @@
## For contributors : To build the format code after any changes, please follow the below command.
Note:Need install Apache Thrift 0.9.3
```
-mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
+mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
```
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index c18298d..d0e5a42 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -27,11 +27,11 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
-import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,15 +42,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.Logger;
/**
* Stores datamap schema in disk as json format
*/
public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
- private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
-
private String storePath;
private String mdtFilePath;
@@ -174,15 +171,17 @@
if (!FileFactory.isFileExist(schemaPath)) {
throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
}
-
- LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
-
- dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
+ Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
+ while (iterator.hasNext()) {
+ DataMapSchema schema = iterator.next();
+ if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
+ iterator.remove();
+ }
+ }
touchMDTFile();
if (!FileFactory.deleteFile(schemaPath)) {
throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
}
- LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
}
private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 08841b6..b32367b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -373,8 +373,7 @@
def updateColumnName(attr: Attribute, counter: Int): String = {
val name = getUpdatedName(attr.name, counter)
- val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
- if (value.nonEmpty) value.head else name
+ attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}
def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -474,7 +473,7 @@
}
def createAttrReference(ref: NamedExpression, name: String): Alias = {
- CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
+ Alias(ref, name)(exprId = ref.exprId, qualifier = None)
}
case class AttributeKey(exp: Expression) {
@@ -538,13 +537,13 @@
case attr: AttributeReference =>
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
- CarbonToSparkAdapter.createAttributeReference(
- name = a.name,
- dataType = a.dataType,
- nullable = a.nullable,
- metadata = a.metadata,
- exprId = a.exprId,
- qualifier = attr.qualifier)
+ CarbonToSparkAdapter.createAttributeReference(a.name,
+ a.dataType,
+ a.nullable,
+ a.metadata,
+ a.exprId,
+ attr.qualifier,
+ a)
} else {
a
}
@@ -576,9 +575,9 @@
outputSel.zip(subsumerOutputList).map{ case (l, r) =>
l match {
case attr: AttributeReference =>
- CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
+ Alias(attr, r.name)(r.exprId, None)
case a@Alias(attr: AttributeReference, name) =>
- CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
+ Alias(attr, r.name)(r.exprId, None)
case other => other
}
}
@@ -595,13 +594,13 @@
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
CarbonToSparkAdapter
- .createAttributeReference(
- a.name,
+ .createAttributeReference(a.name,
a.dataType,
a.nullable,
a.metadata,
a.exprId,
- attr.qualifier)
+ attr.qualifier,
+ a)
} else {
a
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index cff5c41..3ddb0fc 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -125,18 +125,17 @@
arrayBuffer += relation
}
var qualifier: Option[String] = None
- if (attr.qualifier.nonEmpty) {
- qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
+ if (attr.qualifier.isDefined) {
+ qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
Some(carbonTable.getTableName)
} else {
- attr.qualifier.headOption
+ attr.qualifier
}
}
fieldToDataMapFieldMap +=
- getFieldToDataMapFields(
- attr.name,
+ getFieldToDataMapFields(attr.name,
attr.dataType,
- qualifier.headOption,
+ qualifier,
"",
arrayBuffer,
carbonTable.getTableName)
@@ -249,8 +248,7 @@
/**
* Below method will be used to get the fields object for mv table
*/
- private def getFieldToDataMapFields(
- name: String,
+ private def getFieldToDataMapFields(name: String,
dataType: DataType,
qualifier: Option[String],
aggregateType: String,
@@ -315,7 +313,7 @@
val updatedOutList = outputList.map { col =>
val duplicateColumn = duplicateNameCols
.find(a => a.semanticEquals(col))
- val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+ val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
if (duplicateColumn.isDefined) {
val attributesOfDuplicateCol = duplicateColumn.get.collect {
case a: AttributeReference => a
@@ -331,7 +329,7 @@
attributeOfCol.exists(a => a.semanticEquals(expr)))
if (!isStrictDuplicate) {
Alias(col, qualifiedName)(exprId = col.exprId)
- } else if (col.qualifier.nonEmpty) {
+ } else if (col.qualifier.isDefined) {
Alias(col, qualifiedName)(exprId = col.exprId)
// this check is added in scenario where the column is direct Attribute reference and
// since duplicate columns select is allowed, we should just put alias for those columns
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 6fbc87f..7e8eb96 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -18,12 +18,11 @@
package org.apache.carbondata.mv.rewrite
-import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
-import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.spark.sql.types.DataType
import org.apache.carbondata.mv.datamap.MVHelper
import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -96,12 +95,9 @@
// Replace all compensation1 attributes with refrences of subsumer attributeset
val compensationFinal = compensation1.transformExpressions {
case ref: Attribute if subqueryAttributeSet.contains(ref) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
- exprId = ref.exprId, qualifier = subsumerName)
+ AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
- CarbonToSparkAdapter.createAliasRef(
- alias.child, alias.name, alias.exprId, subsumerName)
+ Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
}
compensationFinal
} else {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index 2b4247e..cb2043e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -17,11 +17,10 @@
package org.apache.carbondata.mv.plans.modular
-import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.Metadata
+import org.apache.spark.sql.SQLConf
import org.apache.carbondata.mv.plans
import org.apache.carbondata.mv.plans._
@@ -199,18 +198,18 @@
.isInstanceOf[Attribute]))
val aggOutputList = aggTransMap.values.flatMap(t => t._2)
.map { ref =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(hFactName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(hFactName))
}
val hFactOutputSet = hFact.outputSet
// Update the outputlist qualifier
val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
attr.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(hFactName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(hFactName))
}
}.asInstanceOf[Seq[NamedExpression]]
@@ -218,9 +217,9 @@
val hPredList = s.predicateList.map{ pred =>
pred.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(hFactName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(hFactName))
}
}
val hSel = s.copy(
@@ -242,9 +241,9 @@
val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
wip.transformExpressions {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(hFactName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(hFactName))
}
}
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index b694e78..30857c8 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -52,11 +52,9 @@
plan transform {
case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
- val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
+ val makeupmap = children.zipWithIndex.flatMap {
case (child, i) =>
- aq.find(child.outputSet.contains(_))
- .flatMap(_.qualifier.headOption)
- .map((i, _))
+ aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
}.toMap
g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 7068b7e..0bbacc4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -110,7 +110,10 @@
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
SparkSQLUtil.getRemoveRedundantAliasesObj(),
- RemoveRedundantProject) ++
+ RemoveRedundantProject,
+ SimplifyCreateStructOps,
+ SimplifyCreateArrayOps,
+ SimplifyCreateMapOps) ++
extendedOperatorOptimizationRules: _*) ::
Batch(
"Check Cartesian Products", Once,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index 2033342..3b6c725 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -167,9 +167,7 @@
val aq = attributeSet.filter(_.qualifier.nonEmpty)
children.zipWithIndex.flatMap {
case (child, i) =>
- aq.find(child.outputSet.contains(_))
- .flatMap(_.qualifier.headOption)
- .map((i, _))
+ aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
}.toMap
}
@@ -355,13 +353,28 @@
Seq.empty)
case l: LogicalRelation =>
val tableIdentifier = l.catalogTable.map(_.identifier)
- val database = tableIdentifier.flatMap(_.database).orNull
- val table = tableIdentifier.map(_.table).orNull
+ val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
+ val table = tableIdentifier.map(_.table).getOrElse(null)
Some(database, table, l.output, Nil, NoFlags, Seq.empty)
case l: LocalRelation => // used for unit test
Some(null, null, l.output, Nil, NoFlags, Seq.empty)
case _ =>
+ // this check is added as we get MetastoreRelation in spark2.1,
+ // this is removed in later spark version
+ // TODO: this check can be removed once 2.1 support is removed from carbon
+ if (SparkUtil.isSparkVersionEqualTo("2.1") &&
+ plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
+ val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
+ .asInstanceOf[CatalogTable]
+ Some(catalogTable.database,
+ catalogTable.identifier.table,
+ plan.output,
+ Nil,
+ NoFlags,
+ Seq.empty)
+ } else {
None
+ }
}
}
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index 366284b..d3ce38d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -204,7 +204,7 @@
s.child match {
case a: Alias =>
val qualifierPrefix = a.qualifier
- .map(_ + ".").headOption.getOrElse("")
+ .map(_ + ".").getOrElse("")
s"$qualifierPrefix${
quoteIdentifier(a
.name)
@@ -221,7 +221,7 @@
s.child match {
case a: Alias =>
val qualifierPrefix = a.qualifier.map(_ + ".")
- .headOption.getOrElse("")
+ .getOrElse("")
s"$qualifierPrefix${ quoteIdentifier(a.name) }"
case other => other.sql
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index c3a3a68..b17eea2 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -21,11 +21,8 @@
import scala.collection.immutable
-import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.types.Metadata
-import org.apache.spark.util.SparkUtil
import org.apache.carbondata.mv.expressions.modular._
import org.apache.carbondata.mv.plans._
@@ -119,19 +116,18 @@
if (i > -1) {
// this is a walk around for mystery of spark qualifier
if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
- exprId = ref.exprId, qualifier = Some(aliasMap(i)))
+ AttributeReference(
+ ref.name,
+ ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
} else {
ref
}
} else {
attrMap.get(ref) match {
case Some(alias) =>
- CarbonToSparkAdapter.createAttributeReference(
+ AttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
- ref.dataType, nullable = true, metadata = Metadata.empty,
- exprId = ref.exprId,
+ ref.dataType)(exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
@@ -182,12 +178,13 @@
list = list :+ ((index, subqueryName))
newS = newS.transformExpressions {
case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(subqueryName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
- CarbonToSparkAdapter.createAliasRef(
- alias.child, alias.name, alias.exprId, Some(subqueryName))
+ Alias(alias.child, alias.name)(
+ exprId = alias.exprId,
+ qualifier = Some(subqueryName))
}
case _ =>
@@ -215,12 +212,13 @@
}
newG.transformExpressions {
case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
- CarbonToSparkAdapter.createAttributeReference(
- ref.name, ref.dataType, nullable = true, Metadata.empty,
- ref.exprId, Some(subqueryName))
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
- CarbonToSparkAdapter.createAliasRef(
- alias.child, alias.name, alias.exprId, Some(subqueryName))
+ Alias(alias.child, alias.name)(
+ exprId = alias.exprId,
+ qualifier = Some(subqueryName))
}.copy(alias = Some(subqueryName))
}
}
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
index bad1fc0..b1bfeeb 100644
--- a/docs/alluxio-guide.md
+++ b/docs/alluxio-guide.md
@@ -50,7 +50,7 @@
### Running spark-shell
- Running the command in spark path
```$command
-./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
```
- Testing use alluxio by CarbonSession
```$scala
@@ -98,7 +98,7 @@
--master local \
--jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
--class org.apache.carbondata.examples.AlluxioExample \
-${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
false
```
**NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
diff --git a/docs/faq.md b/docs/faq.md
index 88ca186..16cdfa5 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -316,7 +316,7 @@
2. Use the following command :
```
- mvn -Pspark-2.4 -Dspark.version {yourSparkVersion} clean package
+ mvn -Pspark-2.1 -Dspark.version {yourSparkVersion} clean package
```
Note : Refrain from using "mvn clean package" without specifying the profile.
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 0c49f35..483585f 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -237,9 +237,9 @@
$ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
```
Replace the spark and hadoop version with the version used in your cluster.
- For example, if you are using Spark 2.4.4, you would like to compile using:
+ For example, if you are using Spark 2.2.1 and Hadoop 2.7.2, you would like to compile using:
```
- mvn -DskipTests -Pspark-2.4 -Dspark.version=2.4.4 -Dhadoop.version=2.7.2 clean package
+ mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 -Dhadoop.version=2.7.2 clean package
```
Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index d007e03..b66fce4 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -37,11 +37,11 @@
- [CLOSE STREAM](#close-stream)
## Quick example
-Download and unzip spark-2.4.4-bin-hadoop2.7.tgz, and export $SPARK_HOME
+Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
-Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.6.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
+Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
```shell
-mvn clean package -DskipTests -Pspark-2.4
+mvn clean package -DskipTests -Pspark-2.2
```
Start a socket data server in a terminal
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index d5c1188..b6921f2 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,9 +37,9 @@
s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
+ .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
- spark.sparkContext.setLogLevel("error")
+ spark.sparkContext.setLogLevel("INFO")
exampleBody(spark)
spark.close()
}
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index de3b5bc..73bf941 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -184,6 +184,26 @@
<profiles>
<profile>
+ <id>spark-2.2</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 0790763..199ff84 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -191,103 +191,4 @@
</plugins>
</build>
- <profiles>
- <profile>
- <id>build-all</id>
- <properties>
- <spark.version>2.3.4</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- </profile>
- <profile>
- <id>sdvtest</id>
- <properties>
- <maven.test.skip>true</maven.test.skip>
- </properties>
- </profile>
- <profile>
- <id>spark-2.3</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <spark.version>2.3.4</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.4</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.3</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>spark-2.4</id>
- <properties>
- <spark.version>2.4.4</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.3</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.4</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 2548110..f629260 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,12 +37,11 @@
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
import org.apache.carbondata.spark.util.CommonUtil
object CsvRDDHelper {
@@ -94,9 +93,9 @@
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
- CarbonToSparkAdapter.createFilePartition(
+ FilePartition(
partitions.size,
- currentFiles)
+ currentFiles.toArray.toSeq)
partitions += newPartition
}
currentFiles.clear()
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index a46568a..13e7c45 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,22 +17,23 @@
package org.apache.spark.sql.util
+import java.lang.reflect.Method
+
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateView
+import org.apache.spark.sql.catalyst.analysis.EmptyRule
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
-import org.apache.spark.sql.catalyst.optimizer.{CheckCartesianProducts, EliminateOuterJoin, NullPropagation, PullupCorrelatedPredicates, RemoveRedundantAliases, ReorderJoin}
-import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -50,60 +51,166 @@
}
def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
- logicalPlanObj.stats
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
+ method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val method: Method = logicalPlanObj.getClass.getMethod("stats")
+ method.invoke(logicalPlanObj).asInstanceOf[Statistics]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
: NamedExpression = {
- QueryPlan.normalizeExprId(r, input)
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
+ clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
+ invoke(null, r, input).asInstanceOf[NamedExpression]
+ } else {
+ r
+ }
}
def getStatisticsObj(outputList: Seq[NamedExpression],
plan: LogicalPlan, stats: Statistics,
aliasMap: Option[AttributeMap[Attribute]] = None)
: Statistics = {
- val output = outputList.map(_.toAttribute)
- val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
- table => AttributeMap(table.output.zip(output))
+ val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val output = outputList.map(_.toAttribute)
+ val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+ table => AttributeMap(table.output.zip(output))
+ }
+ val rewrites = mapSeq.head
+ val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
+ getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
+ var attributeStats = AttributeMap(attributes.iterator
+ .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+ if (aliasMap.isDefined) {
+ attributeStats = AttributeMap(
+ attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+ }
+ val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
+ CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+ stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
+ } else {
+ val output = outputList.map(_.name)
+ val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+ table => table.output.map(_.name).zip(output).toMap
+ }
+ val rewrites = mapSeq.head
+ val colStats = CarbonReflectionUtils.getField("colStats", stats)
+ .asInstanceOf[Map[String, ColumnStat]]
+ var attributeStats = colStats.iterator
+ .map { pair => (rewrites(pair._1), pair._2) }.toMap
+ if (aliasMap.isDefined) {
+ val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
+ attributeStats =
+ attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
+ , pair._2))
+ }
+ CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+ stats.rowCount, attributeStats).asInstanceOf[Statistics]
}
- val rewrites = mapSeq.head
- val attributes : AttributeMap[ColumnStat] = stats.attributeStats
- var attributeStats = AttributeMap(attributes.iterator
- .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
- if (aliasMap.isDefined) {
- attributeStats = AttributeMap(
- attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
- }
- val hints = stats.hints
- Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, hints)
}
def getEliminateViewObj(): Rule[LogicalPlan] = {
- EliminateView
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
}
def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
- PullupCorrelatedPredicates
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
}
def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
- RemoveRedundantAliases
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
}
def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
- ReorderJoin
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
- EliminateOuterJoin
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
- NullPropagation
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
- CheckCartesianProducts
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
/**
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 93e66ea..46692df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,22 +17,24 @@
package org.apache.spark.util
+import java.lang.reflect.Method
+
import scala.reflect.runtime._
import scala.reflect.runtime.universe._
-import org.apache.spark.SparkContext
+import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -58,19 +60,45 @@
def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
im.symbol.typeSignature.members.find(_.name.toString.equals(name))
- .map(l => im.reflectField(l.asTerm).get).orNull
+ .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
}
def getUnresolvedRelation(
tableIdentifier: TableIdentifier,
tableAlias: Option[String] = None): UnresolvedRelation = {
- UnresolvedRelation(tableIdentifier)
+ val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ createObject(
+ className,
+ tableIdentifier,
+ tableAlias)._1.asInstanceOf[UnresolvedRelation]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ createObject(
+ className,
+ tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
+ } else {
+ throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
+ }
}
def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
relation: LogicalPlan,
view: Option[TableIdentifier]): SubqueryAlias = {
- SubqueryAlias(alias.getOrElse(""), relation)
+ val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ createObject(
+ className,
+ alias.getOrElse(""),
+ relation,
+ Option(view))._1.asInstanceOf[SubqueryAlias]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ createObject(
+ className,
+ alias.getOrElse(""),
+ relation)._1.asInstanceOf[SubqueryAlias]
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version")
+ }
}
def getInsertIntoCommand(table: LogicalPlan,
@@ -78,23 +106,58 @@
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean): InsertIntoTable = {
- InsertIntoTable(
- table,
- partition,
- query,
- overwrite,
- ifPartitionNotExists)
+ val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val overwriteOptions = createObject(
+ "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
+ overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
+ createObject(
+ className,
+ table,
+ partition,
+ query,
+ overwriteOptions,
+ ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
+ createObject(
+ className,
+ table,
+ partition,
+ query,
+ overwrite.asInstanceOf[Object],
+ ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version")
+ }
}
def getLogicalRelation(relation: BaseRelation,
expectedOutputAttributes: Seq[Attribute],
catalogTable: Option[CatalogTable],
isStreaming: Boolean): LogicalRelation = {
- new LogicalRelation(
- relation,
- expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
- catalogTable,
- isStreaming)
+ val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ createObject(
+ className,
+ relation,
+ Some(expectedOutputAttributes),
+ catalogTable)._1.asInstanceOf[LogicalRelation]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ createObject(
+ className,
+ relation,
+ expectedOutputAttributes,
+ catalogTable)._1.asInstanceOf[LogicalRelation]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+ createObject(
+ className,
+ relation,
+ expectedOutputAttributes,
+ catalogTable,
+ isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version")
+ }
}
@@ -145,28 +208,46 @@
def getSessionState(sparkContext: SparkContext,
carbonSession: Object,
useHiveMetaStore: Boolean): Any = {
- if (useHiveMetaStore) {
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
- val tuple = createObject(className, carbonSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
+ "org.apache.spark.sql.hive.CarbonSessionState")
+ createObject(className, carbonSession)._1
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ if (useHiveMetaStore) {
+ val className = sparkContext.conf.get(
+ CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+ "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
+ } else {
+ val className = sparkContext.conf.get(
+ CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+ "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
+ }
} else {
- val className = sparkContext.conf.get(
- CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
- val tuple = createObject(className, carbonSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
+ throw new UnsupportedOperationException("Spark version not supported")
}
}
def hasPredicateSubquery(filterExp: Expression) : Boolean = {
- val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
- val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
- val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
- hasSubquery
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
+ val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
+ val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+ hasSubquery
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
+ val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
+ val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+ hasSubquery
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
@@ -184,10 +265,19 @@
rdd: RDD[InternalRow],
partition: Partitioning,
metadata: Map[String, String]): RowDataSourceScanExec = {
- RowDataSourceScanExec(output, output.map(output.indexOf),
- pushedFilters.toSet, handledFilters.toSet, rdd,
- relation.relation,
- relation.catalogTable.map(_.identifier))
+ val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
+ if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
+ createObject(className, output, rdd, relation.relation,
+ partition, metadata,
+ relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+ } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ createObject(className, output, output.map(output.indexOf),
+ pushedFilters.toSet, handledFilters.toSet, rdd,
+ relation.relation,
+ relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
def invokewriteAndReadMethod(dataSourceObj: DataSource,
@@ -197,7 +287,25 @@
mode: SaveMode,
query: LogicalPlan,
physicalPlan: SparkPlan): BaseRelation = {
- dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
+ if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val method: Method = dataSourceObj.getClass
+ .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
+ method.invoke(dataSourceObj, mode, dataFrame)
+ .asInstanceOf[BaseRelation]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+ val method: Method = dataSourceObj.getClass
+ .getMethod("writeAndRead",
+ classOf[SaveMode],
+ classOf[LogicalPlan],
+ classOf[Seq[String]],
+ classOf[SparkPlan])
+ // since spark 2.3.2 version (SPARK-PR#22346),
+ // change 'query.output' to 'query.output.map(_.name)'
+ method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
+ .asInstanceOf[BaseRelation]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
/**
@@ -208,7 +316,9 @@
*/
def invokeAlterTableAddColumn(table: TableIdentifier,
colsToAdd: Seq[StructField]): Object = {
- AlterTableAddColumnsCommand(table, colsToAdd)
+ val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
+ CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
+ ._1.asInstanceOf[RunnableCommand]
}
def createSingleObject(className: String): Any = {
@@ -275,6 +385,16 @@
def invokeAnalyzerExecute(analyzer: Analyzer,
plan: LogicalPlan): LogicalPlan = {
- analyzer.executeAndCheck(plan)
+ if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val method: Method = analyzer.getClass
+ .getMethod("execute", classOf[LogicalPlan])
+ method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+ val method: Method = analyzer.getClass
+ .getMethod("executeAndCheck", classOf[LogicalPlan])
+ method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
}
}
diff --git a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
deleted file mode 100644
index be82907..0000000
--- a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.adapter
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
- def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
- FilePartition(index, files.toArray)
- }
-
- def createAttributeReference(
- name: String,
- dataType: DataType,
- nullable: Boolean,
- metadata: Metadata,
- exprId: ExprId,
- qualifier: Option[String],
- attrRef : NamedExpression = null): AttributeReference = {
- val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
- AttributeReference(
- name,
- dataType,
- nullable,
- metadata)(exprId, qf)
- }
-
- def createAliasRef(
- child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Option[String] = None,
- explicitMetadata: Option[Metadata] = None,
- namedExpr : Option[NamedExpression] = None ) : Alias = {
- Alias(child, name)(
- exprId,
- if (qualifier.nonEmpty) Seq(qualifier.get) else Seq(),
- explicitMetadata)
- }
-}
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index 1f1cac3..cda1954 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -192,6 +192,86 @@
</properties>
</profile>
<profile>
+ <id>spark-2.1</id>
+ <properties>
+ <spark.version>2.1.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.3plus</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.1andspark2.2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.2</id>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.3plus</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.1andspark2.2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
@@ -206,6 +286,30 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.1andspark2.2</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.3plus</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
</plugins>
</build>
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..605df66
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+ private Binary[] binaries;
+
+ CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+ super(encoding);
+ binaries = new Binary[dictionary.getDictionarySize()];
+ for (int i = 0; i < binaries.length; i++) {
+ binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+ }
+ }
+
+ @Override
+ public int getMaxId() {
+ return binaries.length - 1;
+ }
+
+ @Override
+ public Binary decodeToBinary(int id) {
+ return binaries[id];
+ }
+}
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
new file mode 100644
index 0000000..7d23d7c
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import java.lang.reflect.Field;
+import java.math.BigInteger;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.3 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ */
+public class CarbonVectorProxy {
+
+ private ColumnarBatch columnarBatch;
+ private ColumnVectorProxy[] columnVectorProxies;
+
+
+ private void updateColumnVectors() {
+ try {
+ Field field = columnarBatch.getClass().getDeclaredField("columns");
+ field.setAccessible(true);
+ field.set(columnarBatch, columnVectorProxies);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Adapter class which handles the columnar vector reading of the carbondata
+ * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+ * handles the complexity of spark 2.3 version related api changes since
+ * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+ *
+ * @param memMode which represent the type onheap or offheap vector.
+ * @param outputSchema, metadata related to current schema of table.
+ * @param rowNum rows number for vector reading
+ * @param useLazyLoad Whether to use lazy load while getting the data.
+ */
+ public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
+ boolean useLazyLoad) {
+ columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+ columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+ for (int i = 0; i < columnVectorProxies.length; i++) {
+ if (useLazyLoad) {
+ columnVectorProxies[i] =
+ new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
+ } else {
+ columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
+ }
+ }
+ updateColumnVectors();
+ }
+
+ public ColumnVectorProxy getColumnVector(int ordinal) {
+ return columnVectorProxies[ordinal];
+ }
+
+ /**
+ * Returns the number of rows for read, including filtered rows.
+ */
+ public int numRows() {
+ return columnarBatch.capacity();
+ }
+
+ /**
+ * This API will return a columnvector from a batch of column vector rows
+ * based on the ordinal
+ *
+ * @param ordinal
+ * @return
+ */
+ public ColumnVector column(int ordinal) {
+ return columnarBatch.column(ordinal);
+ }
+
+ /**
+ * Resets this column for writing. The currently stored values are no longer accessible.
+ */
+ public void reset() {
+ for (int i = 0; i < columnarBatch.numCols(); i++) {
+ ((ColumnVectorProxy) columnarBatch.column(i)).reset();
+ }
+ }
+
+ public void resetDictionaryIds(int ordinal) {
+ (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
+ }
+
+ /**
+ * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+ */
+ public InternalRow getRow(int rowId) {
+ return columnarBatch.getRow(rowId);
+ }
+
+ /**
+ * Returns the row in this batch at `rowId`. Returned row is reused across calls.
+ */
+ public Object getColumnarBatch() {
+ return columnarBatch;
+ }
+
+ /**
+ * Called to close all the columns in this batch. It is not valid to access the data after
+ * calling this. This must be called at the end to clean up memory allocations.
+ */
+ public void close() {
+ columnarBatch.close();
+ }
+
+ /**
+ * Sets the number of rows in this batch.
+ */
+ public void setNumRows(int numRows) {
+ columnarBatch.setNumRows(numRows);
+ }
+
+ public DataType dataType(int ordinal) {
+ return columnarBatch.column(ordinal).dataType();
+ }
+
+ public static class ColumnVectorProxy extends ColumnVector {
+
+ private ColumnVector vector;
+
+ public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
+ super(capacity, columnVector.dataType(), mode);
+ try {
+ Field childColumns =
+ columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
+ childColumns.setAccessible(true);
+ Object o = childColumns.get(columnVector);
+ childColumns.set(this, o);
+ Field resultArray =
+ columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
+ resultArray.setAccessible(true);
+ Object o1 = resultArray.get(columnVector);
+ resultArray.set(this, o1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ vector = columnVector;
+ }
+
+ public void putRowToColumnBatch(int rowId, Object value) {
+ org.apache.spark.sql.types.DataType t = dataType();
+ if (null == value) {
+ putNull(rowId);
+ } else {
+ if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+ putBoolean(rowId, (boolean) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+ putByte(rowId, (byte) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+ putShort(rowId, (short) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+ putInt(rowId, (int) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+ putLong(rowId, (long) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+ putFloat(rowId, (float) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+ putDouble(rowId, (double) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+ UTF8String v = (UTF8String) value;
+ putByteArray(rowId, v.getBytes());
+ } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+ DecimalType dt = (DecimalType) t;
+ Decimal d = Decimal.fromDecimal(value);
+ if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+ putInt(rowId, (int) d.toUnscaledLong());
+ } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+ putLong(rowId, d.toUnscaledLong());
+ } else {
+ final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+ byte[] bytes = integer.toByteArray();
+ putByteArray(rowId, bytes, 0, bytes.length);
+ }
+ } else if (t instanceof CalendarIntervalType) {
+ CalendarInterval c = (CalendarInterval) value;
+ vector.getChildColumn(0).putInt(rowId, c.months);
+ vector.getChildColumn(1).putLong(rowId, c.microseconds);
+ } else if (t instanceof org.apache.spark.sql.types.DateType) {
+ putInt(rowId, (int) value);
+ } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+ putLong(rowId, (long) value);
+ }
+ }
+ }
+
+ public void putBoolean(int rowId, boolean value) {
+ vector.putBoolean(rowId, value);
+ }
+
+ public void putByte(int rowId, byte value) {
+ vector.putByte(rowId, value);
+ }
+
+ public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putBytes(rowId, count, src, srcIndex);
+ }
+
+ public void putShort(int rowId, short value) {
+ vector.putShort(rowId, value);
+ }
+
+ public void putInt(int rowId, int value) {
+ vector.putInt(rowId, value);
+ }
+
+ public void putFloat(int rowId, float value) {
+ vector.putFloat(rowId, value);
+ }
+
+ public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ vector.putFloats(rowId, count, src, srcIndex);
+ }
+
+ public void putLong(int rowId, long value) {
+ vector.putLong(rowId, value);
+ }
+
+ public void putDouble(int rowId, double value) {
+ vector.putDouble(rowId, value);
+ }
+
+ public void putInts(int rowId, int count, int value) {
+ vector.putInts(rowId, count, value);
+ }
+
+ public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ vector.putInts(rowId, count, src, srcIndex);
+ }
+
+ public void putShorts(int rowId, int count, short value) {
+ vector.putShorts(rowId, count, value);
+ }
+
+ public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ vector.putShorts(rowId, count, src, srcIndex);
+ }
+
+ public void putLongs(int rowId, int count, long value) {
+ vector.putLongs(rowId, count, value);
+ }
+
+ public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ vector.putLongs(rowId, count, src, srcIndex);
+ }
+
+ public void putDoubles(int rowId, int count, double value) {
+ vector.putDoubles(rowId, count, value);
+ }
+
+ public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ vector.putDoubles(rowId, count, src, srcIndex);
+ }
+
+ public DataType dataType(int ordinal) {
+ return vector.dataType();
+ }
+
+ public void putNotNull(int rowId) {
+ vector.putNotNull(rowId);
+ }
+
+ public void putNotNulls(int rowId, int count) {
+ vector.putNotNulls(rowId, count);
+ }
+
+ public void putDictionaryInt(int rowId, int value) {
+ vector.getDictionaryIds().putInt(rowId, value);
+ }
+
+ public void setDictionary(CarbonDictionary dictionary) {
+ if (null != dictionary) {
+ CarbonDictionaryWrapper dictionaryWrapper =
+ new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
+ vector.setDictionary(dictionaryWrapper);
+ this.dictionary = dictionaryWrapper;
+ } else {
+ this.dictionary = null;
+ vector.setDictionary(null);
+ }
+ }
+
+ public void putNull(int rowId) {
+ vector.putNull(rowId);
+ }
+
+ public void putNulls(int rowId, int count) {
+ vector.putNulls(rowId, count);
+ }
+
+ public boolean hasDictionary() {
+ return vector.hasDictionary();
+ }
+
+ public ColumnVector reserveDictionaryIds(int capacity) {
+ this.dictionaryIds = vector.reserveDictionaryIds(capacity);
+ return dictionaryIds;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return vector.getBoolean(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return vector.getByte(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return vector.getShort(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ return vector.getInt(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return vector.getLong(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return vector.getFloat(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return vector.getDouble(i);
+ }
+
+ @Override
+ protected void reserveInternal(int capacity) {
+ }
+
+ @Override
+ public void reserve(int requiredCapacity) {
+ vector.reserve(requiredCapacity);
+ }
+
+ @Override
+ public long nullsNativeAddress() {
+ return vector.nullsNativeAddress();
+ }
+
+ @Override
+ public long valuesNativeAddress() {
+ return vector.valuesNativeAddress();
+ }
+
+ @Override
+ public void putBooleans(int rowId, int count, boolean value) {
+ vector.putBooleans(rowId, count, value);
+ }
+
+ @Override
+ public void putBytes(int rowId, int count, byte value) {
+ vector.putBytes(rowId, count, value);
+ }
+
+ @Override
+ public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putIntsLittleEndian(rowId, count, src, srcIndex);
+ }
+
+ @Override
+ public int getDictId(int rowId) {
+ return vector.getDictId(rowId);
+ }
+
+ @Override
+ public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putLongsLittleEndian(rowId, count, src, srcIndex);
+ }
+
+ @Override
+ public void putFloats(int rowId, int count, float value) {
+ vector.putFloats(rowId, count, value);
+ }
+
+ @Override
+ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putFloats(rowId, count, src, srcIndex);
+ }
+
+ @Override
+ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putDoubles(rowId, count, src, srcIndex);
+ }
+
+ @Override
+ public void putArray(int rowId, int offset, int length) {
+ vector.putArray(rowId, offset, length);
+ }
+
+ @Override
+ public int getArrayLength(int rowId) {
+ return vector.getArrayLength(rowId);
+ }
+
+ @Override
+ public int getArrayOffset(int rowId) {
+ return vector.getArrayOffset(rowId);
+ }
+
+ @Override
+ public void loadBytes(Array array) {
+ vector.loadBytes(array);
+ }
+
+ @Override
+ public int putByteArray(int rowId, byte[] value, int offset, int count) {
+ return vector.putByteArray(rowId, value, offset, count);
+ }
+
+ /**
+ * It keeps all binary data of all rows to it.
+ * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
+ * and offset.
+ */
+ public void putAllByteArray(byte[] data, int offset, int length) {
+ vector.arrayData().appendBytes(length, data, offset);
+ }
+
+ @Override
+ public void close() {
+ vector.close();
+ }
+
+ public void reset() {
+ if (isConstant) {
+ return;
+ }
+ vector.reset();
+ }
+
+ public void setLazyPage(LazyPageLoader lazyPage) {
+ lazyPage.loadPage();
+ }
+
+ public ColumnVector getVector() {
+ return vector;
+ }
+ }
+
+ public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
+
+ private ColumnVector vector;
+
+ private LazyPageLoader pageLoad;
+
+ private boolean isLoaded;
+
+ public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
+ super(columnVector, capacity, mode);
+ vector = columnVector;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ checkPageLoaded();
+ return vector.isNullAt(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ checkPageLoaded();
+ return vector.getBoolean(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ checkPageLoaded();
+ return vector.getByte(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ checkPageLoaded();
+ return vector.getShort(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ checkPageLoaded();
+ return vector.getInt(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ checkPageLoaded();
+ return vector.getLong(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ checkPageLoaded();
+ return vector.getFloat(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ checkPageLoaded();
+ return vector.getDouble(i);
+ }
+
+ @Override
+ public int getArrayLength(int rowId) {
+ checkPageLoaded();
+ return vector.getArrayLength(rowId);
+ }
+
+ @Override
+ public int getArrayOffset(int rowId) {
+ checkPageLoaded();
+ return vector.getArrayOffset(rowId);
+ }
+
+ private void checkPageLoaded() {
+ if (!isLoaded) {
+ if (pageLoad != null) {
+ pageLoad.loadPage();
+ }
+ isLoaded = true;
+ }
+ }
+
+ public void reset() {
+ if (isConstant) {
+ return;
+ }
+ isLoaded = false;
+ vector.reset();
+ }
+
+ public void setLazyPage(LazyPageLoader lazyPage) {
+ this.pageLoad = lazyPage;
+ }
+
+ }
+}
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
similarity index 100%
rename from integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
rename to integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index dfebf6f..7b65e0b 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -253,12 +253,9 @@
</properties>
</profile>
<profile>
- <id>spark-2.3</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
+ <id>spark-2.1</id>
<properties>
- <spark.version>2.3.4</spark.version>
+ <spark.version>2.1.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
@@ -269,7 +266,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
- <exclude>src/main/spark2.4</exclude>
+ <exclude>src/main/spark2.2</exclude>
+ <exclude>src/main/spark2.3</exclude>
+ <exclude>src/main/commonTo2.2And2.3</exclude>
</excludes>
</configuration>
</plugin>
@@ -286,7 +285,8 @@
</goals>
<configuration>
<sources>
- <source>src/main/spark2.3</source>
+ <source>src/main/spark2.1</source>
+ <source>src/main/commonTo2.1And2.2</source>
</sources>
</configuration>
</execution>
@@ -296,9 +296,9 @@
</build>
</profile>
<profile>
- <id>spark-2.4</id>
+ <id>spark-2.2</id>
<properties>
- <spark.version>2.4.4</spark.version>
+ <spark.version>2.2.1</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
@@ -309,6 +309,7 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
+ <exclude>src/main/spark2.1</exclude>
<exclude>src/main/spark2.3</exclude>
</excludes>
</configuration>
@@ -326,7 +327,55 @@
</goals>
<configuration>
<sources>
- <source>src/main/spark2.4</source>
+ <source>src/main/spark2.2</source>
+ <source>src/main/commonTo2.2And2.3</source>
+ <source>src/main/commonTo2.1And2.2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.3</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>2.3.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.1</exclude>
+ <exclude>src/main/spark2.2</exclude>
+ <exclude>src/main/commonTo2.1And2.2</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.3</source>
+ <source>src/main/commonTo2.2And2.3</source>
</sources>
</configuration>
</execution>
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
similarity index 81%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
rename to integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 60ee7ea..7605574 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.execution.strategy
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -32,7 +32,7 @@
val rdd: RDD[InternalRow],
@transient override val relation: HadoopFsRelation,
val partitioning: Partitioning,
- val md: Map[String, String],
+ override val metadata: Map[String, String],
identifier: Option[TableIdentifier],
@transient private val logicalRelation: LogicalRelation)
extends FileSourceScanExec(
@@ -40,20 +40,14 @@
output,
relation.dataSchema,
Seq.empty,
- None,
Seq.empty,
identifier) {
- // added lazy since spark 2.3.2 version (SPARK-PR#21815)
- override lazy val supportsBatch: Boolean = true
+ override val supportsBatch: Boolean = true
- // added lazy since spark 2.3.2 version (SPARK-PR#21815)
- override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+ override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
(partitioning, Nil)
- // added lazy since spark 2.3.2 version (SPARK-PR#21815)
- override lazy val metadata: Map[String, String] = md
-
override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
similarity index 98%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
index 7b20c06..d180cd3 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -38,7 +37,6 @@
output,
outputSchema,
partitionFilters,
- None,
dataFilters,
tableIdentifier)
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
similarity index 99%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
index 04beea7..8f4d45e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -47,4 +47,4 @@
logicalPlan
}
}
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
similarity index 97%
rename from integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index 4ad3b11..36f166d 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -21,20 +21,21 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
similarity index 68%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
index e19966f..72d3ae2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.spark.sql.hive
import org.apache.spark.sql.CarbonDatasourceHadoopRelation
@@ -46,7 +29,7 @@
}
Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
- case In(value, Seq(l: ListQuery)) =>
+ case In(value, Seq(l:ListQuery)) =>
val tPlan = l.plan.transform {
case lr: LogicalRelation
if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
@@ -58,4 +41,4 @@
}
transFormedPlan
}
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
similarity index 92%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
index 44b3bfd..f78c785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,11 +27,12 @@
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
@@ -60,8 +61,8 @@
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends HiveSessionCatalog (
- SparkAdapter.getExternalCatalogCatalog(externalCatalog),
- SparkAdapter.getGlobalTempViewManager(globalTempViewManager),
+ externalCatalog,
+ globalTempViewManager,
new HiveMetastoreCatalog(sparkSession),
functionRegistry,
conf,
@@ -110,9 +111,8 @@
* @return
*/
override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- SparkAdapter.getHiveExternalCatalog(
- sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- ).client
+ sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+ .asInstanceOf[HiveExternalCatalog].client
}
override def alterAddColumns(tableIdentifier: TableIdentifier,
@@ -174,10 +174,9 @@
* @param identifier
* @return
*/
- override def getPartitionsAlternate(
- partitionFilters: Seq[Expression],
+ override def getPartitionsAlternate(partitionFilters: Seq[Expression],
sparkSession: SparkSession,
- identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+ identifier: TableIdentifier) = {
CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
}
@@ -234,14 +233,14 @@
}
private def externalCatalog: HiveExternalCatalog =
- SparkAdapter.getHiveExternalCatalog(session.sharedState.externalCatalog)
+ session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
/**
* Create a Hive aware resource loader.
*/
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
val client: HiveClient = externalCatalog.client.newSession()
- new HiveSessionResourceLoader(session, SparkAdapter.getHiveClient(client))
+ new HiveSessionResourceLoader(session, client)
}
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
@@ -275,4 +274,4 @@
}
override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
similarity index 85%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e9bcb43..e3f1d3f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -1,36 +1,39 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.spark.sql.hive
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.Callable
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
-import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.SparkTypeConverter
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
* This class refresh the relation from cache if the carbontable in
@@ -174,7 +177,8 @@
def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
- case sa@SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+ case sa@SubqueryAlias(_,
+ MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
plan.asInstanceOf[LogicalRelation].copy()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
similarity index 100%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
similarity index 98%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 0335b36..ee9fb0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -19,12 +20,12 @@
import java.net.URI
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.util.CarbonReflectionUtils
/**
@@ -48,7 +49,7 @@
Seq.empty
}
- override def processData(sparkSession: SparkSession): Seq[Row] = {
+ override def processData(sparkSession: SparkSession): Seq[Row] ={
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
similarity index 100%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
rename to integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index 78d6a46..aa650e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -17,8 +17,13 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
case class CastExpr(expr: Expression) extends Filter {
override def references: Array[String] = null
@@ -28,6 +33,26 @@
override def references: Array[String] = null
}
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+ extends LeafExpression with NamedExpression with CodegenFallback {
+
+ type EvaluatedType = Any
+
+ override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+ override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+ override def name: String = colExp.getColumnName
+
+ override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+ override def exprId: ExprId = throw new UnsupportedOperationException
+
+ override def qualifier: Option[String] = null
+
+ override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
+
case class CarbonEndsWith(expr: Expression) extends Filter {
override def references: Array[String] = null
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 23c078a..e020a99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -258,67 +258,41 @@
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((org.apache.spark.sql.types.Decimal)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()",
+ s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
} else {
getDictionaryColumnIds(index)._3.getDataType match {
case CarbonDataTypes.INT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((Integer)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
case CarbonDataTypes.SHORT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((Short)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
case CarbonDataTypes.DOUBLE => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((Double)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
case CarbonDataTypes.LONG => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((Long)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
case CarbonDataTypes.BOOLEAN => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((Boolean)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
case _ => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
""".stripMargin
- CarbonToSparkAdapter.createExprCode(
- code,
- s"$value.getIsNull()",
- s"((UTF8String)$value.getValue())",
- expr.dataType)
+ ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
+
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 703df20..376d121 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,9 +40,11 @@
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
@@ -181,9 +183,12 @@
LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
"Now existing Schema will be overwritten with default properties")
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
+ val identifier = AbsoluteTableIdentifier.from(
+ CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
+ dbName,
+ tableName)
val updatedParams = CarbonSource.updateAndCreateTable(
- dbName, tableName, tablePath, dataSchema, sparkSession, metaStore, parameters, None)
+ identifier, dataSchema, sparkSession, metaStore, parameters, None)
(CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -273,10 +278,9 @@
lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
def createTableInfoFromParams(
- databaseName: String,
- tableName: String,
parameters: Map[String, String],
dataSchema: StructType,
+ identifier: AbsoluteTableIdentifier,
query: Option[LogicalPlan],
sparkSession: SparkSession): TableModel = {
val sqlParser = new CarbonSpark2SqlParser
@@ -297,8 +301,8 @@
sqlParser.getFields(dataSchema)
}
val bucketFields = sqlParser.getBucketFields(map, fields, options)
- sqlParser.prepareTableModel(ifNotExistPresent = false, Option(databaseName),
- tableName, fields, Nil, map, bucketFields)
+ sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
+ identifier.getTableName, fields, Nil, map, bucketFields)
}
/**
@@ -310,8 +314,7 @@
def updateCatalogTableWithCarbonSchema(
tableDesc: CatalogTable,
sparkSession: SparkSession,
- query: Option[LogicalPlan] = None,
- persistSchema: Boolean = true): CatalogTable = {
+ query: Option[LogicalPlan] = None): CatalogTable = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val storageFormat = tableDesc.storage
val properties = storageFormat.properties
@@ -319,16 +322,14 @@
val tablePath = CarbonEnv.getTablePath(
tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
val map = updateAndCreateTable(
- dbName,
- tableDesc.identifier.table,
- tablePath,
+ identifier,
tableDesc.schema,
sparkSession,
metaStore,
properties,
- query,
- persistSchema)
+ query)
// updating params
val updatedFormat = CarbonToSparkAdapter
.getUpdatedStorageFormat(storageFormat, map, tablePath)
@@ -350,56 +351,36 @@
}
}
- def createTableInfo(
- databaseName: String,
- tableName: String,
- tablePath: String,
- dataSchema: StructType,
- properties: Map[String, String],
- query: Option[LogicalPlan],
- sparkSession: SparkSession
- ): TableInfo = {
- val model = createTableInfoFromParams(
- databaseName, tableName, properties, dataSchema, query, sparkSession)
- val tableInfo = TableNewProcessor(model)
- val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
- tableInfo.setTablePath(tablePath)
- tableInfo.setTransactionalTable(isTransactionalTable)
- tableInfo.setDatabaseName(databaseName)
- val schemaEvolutionEntry = new SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- tableInfo
- }
-
def updateAndCreateTable(
- databaseName: String,
- tableName: String,
- tablePath: String,
+ identifier: AbsoluteTableIdentifier,
dataSchema: StructType,
sparkSession: SparkSession,
metaStore: CarbonMetaStore,
properties: Map[String, String],
- query: Option[LogicalPlan],
- persistSchema: Boolean = true): Map[String, String] = {
- val tableInfo = createTableInfo(
- databaseName, tableName, tablePath, dataSchema, properties, query, sparkSession)
- val map = if (persistSchema &&
- !metaStore.isReadFromHiveMetaStore &&
- tableInfo.isTransactionalTable) {
- metaStore.saveToDisk(tableInfo, tablePath)
+ query: Option[LogicalPlan]): Map[String, String] = {
+ val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
+ val tableInfo: TableInfo = TableNewProcessor(model)
+ val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
+ tableInfo.setTablePath(identifier.getTablePath)
+ tableInfo.setTransactionalTable(isTransactionalTable)
+ tableInfo.setDatabaseName(identifier.getDatabaseName)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
+ metaStore.saveToDisk(tableInfo, identifier.getTablePath)
new java.util.HashMap[String, String]()
} else {
CarbonUtil.convertToMultiStringMap(tableInfo)
}
properties.foreach(e => map.put(e._1, e._2))
- map.put("tablepath", tablePath)
- map.put("dbname", databaseName)
+ map.put("tablepath", identifier.getTablePath)
+ map.put("dbname", identifier.getDatabaseName)
if (map.containsKey("tableName")) {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.warn("tableName is not required in options, ignoring it")
}
- map.put("tableName", tableName)
+ map.put("tableName", identifier.getTableName)
map.asScala.toMap
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 233f28d..8a37989 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -38,5 +38,5 @@
override def genCode(ctx: CodegenContext): ExprCode = nonDt.genCode(ctx)
- override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy()
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 9f203c8..ded87b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -164,7 +164,6 @@
}
private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
- LOGGER.info("Trying to drop DataMap from system folder")
try {
if (dataMapSchema == null) {
dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b90faa7..130580d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -690,13 +690,13 @@
attr.dataType
}
}
- CarbonToSparkAdapter.createAttributeReference(
- attr.name,
+ CarbonToSparkAdapter.createAttributeReference(attr.name,
updatedDataType,
attr.nullable,
attr.metadata,
attr.exprId,
- attr.qualifier)
+ attr.qualifier,
+ attr)
}
// Only select the required columns
var output = if (partition.nonEmpty) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index d3f8079..c12ff6c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -97,12 +97,12 @@
carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable)(sparkSession)
- // when we call
+ // In case of spark2.2 and above and , when we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of add column, spark gets the catalog table and then it itself adds the partition
// columns if the table is partition table for all the new data schema sent by carbon,
// so there will be duplicate partition columns, so send the columns without partition columns
- val cols = if (carbonTable.isHivePartitionTable) {
+ val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
(col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index cf05a9d..7e66d34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -163,7 +163,7 @@
if (isDataTypeChange) {
// if column datatype change operation is on partition column, then fail the datatype change
// operation
- if (null != carbonTable.getPartitionInfo) {
+ if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
partitionColumns.asScala.foreach {
col =>
@@ -286,13 +286,14 @@
// update the schema changed column at the specific index in carbonColumns based on schema order
carbonColumns
.update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
- // When we call
+ // In case of spark2.2 and above and , when we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of rename column or change datatype, spark gets the catalog table and then it itself
// adds the partition columns if the table is partition table for all the new data schema sent
// by carbon, so there will be duplicate partition columns, so send the columns without
// partition columns
- val columns = if (carbonTable.isHivePartitionTable) {
+ val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+ carbonTable.isHivePartitionTable) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains(
col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 4cb8a2e..bdc0228 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -151,12 +151,13 @@
val cols = carbonTable.getCreateOrderColumn().asScala
.collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
.filterNot(column => delCols.contains(column))
- // When we call
+ // In case of spark2.2 and above and , when we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of drop column, spark gets the catalog table and then it itself adds the partition
// columns if the table is partition table for all the new data schema sent by carbon,
// so there will be duplicate partition columns, so send the columns without partition columns
- val columns = if (carbonTable.isHivePartitionTable) {
+ val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+ carbonTable.isHivePartitionTable) {
val partitionColumns = partitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
Some(carbonColumnsWithoutPartition)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
deleted file mode 100644
index f36e1ac..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.table
-
-import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
-
-/**
- * Command to create table in case of 'USING CARBONDATA' DDL
- *
- * @param catalogTable catalog table created by spark
- * @param ignoreIfExists ignore if table exists
- * @param sparkSession spark session
- */
-case class CarbonCreateDataSourceTableCommand(
- catalogTable: CatalogTable,
- ignoreIfExists: Boolean,
- sparkSession: SparkSession)
- extends MetadataCommand {
-
- override def processMetadata(session: SparkSession): Seq[Row] = {
- // Run the spark command to create table in metastore before saving carbon schema
- // in table path.
- // This is required for spark 2.4, because spark 2.4 will fail to create table
- // if table path is created before hand
- val updatedCatalogTable =
- CarbonSource.updateCatalogTableWithCarbonSchema(catalogTable, session, None, false)
- val sparkCommand = CreateDataSourceTableCommand(updatedCatalogTable, ignoreIfExists)
- sparkCommand.run(session)
-
- // save the table info (carbondata's schema) in table path
- val tableName = catalogTable.identifier.table
- val dbName = CarbonEnv.getDatabaseName(catalogTable.identifier.database)(session)
- val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(session)
- val tableInfo = CarbonSource.createTableInfo(
- dbName, tableName, tablePath, catalogTable.schema, catalogTable.properties, None, session)
- val metastore = CarbonEnv.getInstance(session).carbonMetaStore
- metastore.saveToDisk(tableInfo, tablePath)
-
- Seq.empty
- }
-
- override protected def opName: String = "CREATE TABLE USING CARBONDATA"
-
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 2a5d78b..4adb1aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -207,9 +207,8 @@
} catch {
case _: Exception => // No operation
}
- throw e
val msg = s"Create table'$tableName' in database '$dbName' failed"
- throwMetadataException(dbName, tableName, s"$msg, ${e.getMessage}")
+ throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
}
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8700b29..54a5757 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -134,7 +134,6 @@
}
val indexDatamapSchemas =
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- LOGGER.info(s"Dropping DataMaps in table $tableName, size: " + indexDatamapSchemas.size())
if (!indexDatamapSchemas.isEmpty) {
childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
val command = CarbonDropDataMapCommand(schema.getDataMapName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 8acc749..2e1f91f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -142,13 +142,10 @@
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
val attrs = projectExprsNeedToDecode.map { attr =>
- val newAttr = CarbonToSparkAdapter.createAttributeReference(
- attr.name,
+ val newAttr = AttributeReference(attr.name,
attr.dataType,
attr.nullable,
- attr.metadata,
- attr.exprId,
- Option(table.carbonRelation.tableName))
+ attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
relation.addAttribute(newAttr)
newAttr
}
@@ -197,7 +194,8 @@
attr.nullable,
attr.metadata,
attr.exprId,
- attr.qualifier)
+ attr.qualifier,
+ attr)
}
}
partitions =
@@ -405,7 +403,7 @@
newProjectList :+= reference
a.transform {
case s: ScalaUDF =>
- CarbonToSparkAdapter.createScalaUDF(s, reference)
+ ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
}
case other => other
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 49a3d3b..a851bc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,7 @@
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand}
import org.apache.spark.sql.execution.command.schema._
-import org.apache.spark.sql.execution.command.table.{CarbonCreateDataSourceTableCommand, CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
@@ -171,7 +171,7 @@
ExecutedCommandExec(addColumn) :: Nil
}
// TODO: remove this else if check once the 2.1 version is unsupported by carbon
- } else {
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
@@ -185,6 +185,8 @@
.invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
Nil
// TODO: remove this else check once the 2.1 version is unsupported by carbon
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -273,12 +275,9 @@
if table.provider.get != DDLUtils.HIVE_PROVIDER
&& (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| table.provider.get.equalsIgnoreCase("carbondata")) =>
- val cmd = if (SparkUtil.isSparkVersionEqualTo("2.4")) {
- CarbonCreateDataSourceTableCommand(table, ignoreIfExists, sparkSession)
- } else {
- val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
- CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
- }
+ val updatedCatalog = CarbonSource
+ .updateCatalogTableWithCarbonSchema(table, sparkSession)
+ val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 5e82eb3..fd7defa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -26,8 +26,8 @@
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index eda131f..5323293 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -78,7 +78,9 @@
"Update operation is not supported for mv datamap table")
}
}
- val tableRelation =
+ val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ relation
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
alias match {
case Some(_) =>
CarbonReflectionUtils.getSubqueryAlias(
@@ -88,6 +90,9 @@
Some(table.tableIdentifier))
case _ => relation
}
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version.")
+ }
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
@@ -216,15 +221,21 @@
}
}
// include tuple id in subquery
- alias match {
- case Some(_) =>
- val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
- sparkSession,
- alias,
- relation,
- Some(table.tableIdentifier))
- Project(projList, subqueryAlias)
- case _ => Project(projList, relation)
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ Project(projList, relation)
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ alias match {
+ case Some(_) =>
+ val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
+ sparkSession,
+ alias,
+ relation,
+ Some(table.tableIdentifier))
+ Project(projList, subqueryAlias)
+ case _ => Project(projList, relation)
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark version.")
}
}
CarbonProjectForDeleteCommand(
@@ -303,7 +314,13 @@
}
}
val newChild: LogicalPlan = if (newChildOutput == child.output) {
- throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+ CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
+ } else {
+ throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
+ }
} else {
Project(newChildOutput, child)
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 849d7ba..b2ba7f4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -200,7 +200,8 @@
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDatasourceHadoopRelation.carbonRelation
case SubqueryAlias(_, c)
- if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+ (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
@@ -523,7 +524,8 @@
carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDataSourceHadoopRelation
case SubqueryAlias(_, c)
- if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
+ (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName
.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 68c293b..9b3ff87 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -18,11 +18,11 @@
import java.util.LinkedHashSet
+import scala.Array.canBuildFrom
import scala.collection.JavaConverters._
-import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
@@ -119,8 +119,7 @@
val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
- CarbonToSparkAdapter.createAttributeReference(
- column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
+ AttributeReference(column.getColName, output, nullable = true )(
qualifier = Option(tableName + "." + column.getColName))
} else {
val output = CarbonMetastoreTypes.toDataType {
@@ -130,8 +129,7 @@
case others => others
}
}
- CarbonToSparkAdapter.createAttributeReference(
- column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
+ AttributeReference(column.getColName, output, nullable = true)(
qualifier = Option(tableName + "." + column.getColName))
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index dcba730..20d43df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -26,9 +26,11 @@
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
/**
- * This interface defines those common api used by carbon for spark integration,
+ * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
* but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the
* Concrete implementation classes.
+ * For example CarbonSessionCatalog defined in 2.1 and 2.2.
+ *
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e288e6d..2765c5f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -315,13 +315,15 @@
* @return
*/
def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
- val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
- .asInstanceOf[Option[Expression]]
- if (trimStr.isDefined) {
- false
- } else {
- true
+ var isCompatible = true
+ if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+ val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+ .asInstanceOf[Option[Expression]]
+ if (trimStr.isDefined) {
+ isCompatible = false
+ }
}
+ isCompatible
}
def transformExpression(expr: Expression): CarbonExpression = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4866301..10b661a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -305,7 +305,7 @@
}
}
val (selectStmt, relation) =
- if (selectPattern.findFirstIn(sel.toLowerCase).isEmpty ||
+ if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
!StringUtils.isEmpty(subQueryResults)) {
// if subQueryResults are not empty means, it is not join with main table.
// so use subQueryResults in update with value flow.
@@ -348,6 +348,8 @@
rel
}
+
+
private def updateRelation(
r: UnresolvedRelation,
tableIdent: Seq[String],
@@ -360,6 +362,8 @@
case Seq(dbName, tableName) => Some(tableName)
case Seq(tableName) => Some(tableName)
}
+ // Use Reflection to choose between Spark2.1 and Spark2.2
+ // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
}
}
@@ -382,6 +386,8 @@
val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
+ // Use Reflection to choose between Spark2.1 and Spark2.2
+ // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
(unresolvedRelation, tableIdent, alias, tableIdentifier)
@@ -672,7 +678,6 @@
val name = field.column.toLowerCase
field.copy(column = name, name = Some(name))
}
-
protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..79a6240
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+ metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier,attrRef.isGenerated)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr: Option[NamedExpression] = None): Alias = {
+ val isGenerated:Boolean = if (namedExpr.isDefined) {
+ namedExpr.get.isGenerated
+ } else {
+ false
+ }
+ Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation)
+ }
+
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(tablePath))
+ }
+
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq(OptimizeCodegen(conf))
+ }
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 88%
copy from integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
copy to integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index 7b20c06..69ed477 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
object MixedFormatHandlerUtil {
@@ -33,13 +32,13 @@
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier]
): FileSourceScanExec = {
+ val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
FileSourceScanExec(
relation,
output,
outputSchema,
partitionFilters,
- None,
- dataFilters,
+ pushedDownFilters,
tableIdentifier)
}
}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
new file mode 100644
index 0000000..eb3e88d
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import com.google.common.base.Objects
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
+ */
+case class HiveTableRelation(
+ tableMeta: CatalogTable,
+ dataCols: Seq[AttributeReference],
+ partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
+ assert(tableMeta.identifier.database.isDefined)
+ assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+ assert(tableMeta.schema.sameType(dataCols.toStructType))
+
+ // The partition column should always appear after data columns.
+ override def output: Seq[AttributeReference] = dataCols ++ partitionCols
+
+ def isPartitioned: Boolean = partitionCols.nonEmpty
+
+ override def equals(relation: Any): Boolean = relation match {
+ case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(tableMeta.identifier, output)
+ }
+
+ override def newInstance(): HiveTableRelation = copy(
+ dataCols = dataCols.map(_.newInstance()),
+ partitionCols = partitionCols.map(_.newInstance()))
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
new file mode 100644
index 0000000..9a88255
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class MigrateOptimizer {
+
+}
+
+/**
+ * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
+ */
+object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Deduplicate(keys, child, streaming) if !streaming =>
+ val keyExprIds = keys.map(_.exprId)
+ val aggCols = child.output.map { attr =>
+ if (keyExprIds.contains(attr.exprId)) {
+ attr
+ } else {
+ Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
+ }
+ }
+ Aggregate(keys, aggCols, child)
+ }
+}
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplicate(
+ keys: Seq[Attribute],
+ child: LogicalPlan,
+ streaming: Boolean) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * Remove projections from the query plan that do not make any modifications.
+ */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case p @ Project(_, child) if p.output == child.output => child
+ }
+}
+
+/**
+ * push down operations into [[CreateNamedStructLike]].
+ */
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ // push down field extraction
+ case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
+ createNamedStructLike.valExprs(ordinal)
+ }
+ }
+}
+
+/**
+ * push down operations into [[CreateArray]].
+ */
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ // push down field selection (array of structs)
+ case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
+ // instead f selecting the field on the entire array,
+ // select it from each member of the array.
+ // pushing down the operation this way open other optimizations opportunities
+ // (i.e. struct(...,x,...).x)
+ CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
+ // push down item selection.
+ case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+ // instead of creating the array and then selecting one row,
+ // remove array creation altgether.
+ if (idx >= 0 && idx < elems.size) {
+ // valid index
+ elems(idx)
+ } else {
+ // out of bounds, mimic the runtime behavior and return null
+ Literal(null, ga.dataType)
+ }
+ }
+ }
+}
+
+/**
+ * push down operations into [[CreateMap]].
+ */
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
+ }
+ }
+}
+
+
+/**
+ * Removes MapObjects when the following conditions are satisfied
+ * 1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
+ * are primitive types with non-nullable
+ * 2. no custom collection class specified representation of data item.
+ */
+object EliminateMapObjects extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
+ }
+}
diff --git a/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
similarity index 65%
rename from integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
rename to integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
index a2ca9e6..4abf189 100644
--- a/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.carbondata.spark.adapter
+package org.apache.spark.sql.catalyst.plans.logical
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
-
-object CarbonToSparkAdapter {
- def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
- FilePartition(index, files.toArray.toSeq)
- }
-}
+/**
+ * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
+ * recognize a subquery as such, and it allows us to write subquery aware transformations.
+ */
+case class Subquery(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
new file mode 100644
index 0000000..8eb05fc
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+ val carbonProperties = CarbonProperties.getInstance()
+
+ /**
+ * To initialize dynamic param defaults along with usage docs
+ */
+ def addDefaultCarbonParams(): Unit = {
+ val ENABLE_UNSAFE_SORT =
+ SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+ .doc("To enable/ disable unsafe sort.")
+ .booleanConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+ SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+ .doc("To set carbon task distribution.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+ val BAD_RECORDS_LOGGER_ENABLE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+ .doc("To enable/ disable carbon bad record logger.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants
+ .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ val BAD_RECORDS_ACTION =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+ .doc("To configure the bad records action.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ val IS_EMPTY_DATA_BAD_RECORD =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+ .doc("Property to decide weather empty data to be considered bad/ good record.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+ .toBoolean)
+ val SORT_SCOPE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+ .doc("Property to specify sort scope.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ val BAD_RECORD_PATH =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+ .doc("Property to configure the bad record location.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ val GLOBAL_SORT_PARTITIONS =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+ .doc("Property to configure the global sort partitions.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ val DATEFORMAT =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+ .doc("Property to configure data format for date type columns.")
+ .stringConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+ "carbon.input.segments.<database_name>.<table_name>")
+ .doc("Property to configure the list of segments to query.").stringConf
+ .createWithDefault(carbonProperties
+ .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+ }
+ /**
+ * to set the dynamic properties default values
+ */
+ def addDefaultCarbonSessionParams(): Unit = {
+ sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..6b94806
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.spark.util.CarbonReflectionUtils
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ sparkSession: SparkSession,
+ functionResourceLoader: FunctionResourceLoader,
+ functionRegistry: FunctionRegistry,
+ conf: SQLConf,
+ hadoopConf: Configuration)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ hadoopConf) with CarbonSessionCatalog {
+
+ private lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+ /**
+ * return's the carbonEnv instance
+ * @return
+ */
+ override def getCarbonEnv() : CarbonEnv = {
+ carbonEnv
+ }
+
+ def alterAddColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ def alterDropColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ // Initialize all listeners to the Operation bus.
+ CarbonEnv.init
+
+ /**
+ * This method will invalidate carbonrelation from cache if carbon table is updated in
+ * carbon catalog
+ *
+ * @param name
+ * @param alias
+ * @return
+ */
+ override def lookupRelation(name: TableIdentifier,
+ alias: Option[String]): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name, alias)
+ var toRefreshRelation = false
+ rtnRelation match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+ toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+ case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+ case _ =>
+ }
+
+ if (toRefreshRelation) {
+ super.lookupRelation(name, alias)
+ } else {
+ rtnRelation
+ }
+ }
+
+ /**
+ * returns hive client from session state
+ *
+ * @return
+ */
+ override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ }
+
+ override def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ try {
+ val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
+ } catch {
+ case e: Exception =>
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ }
+
+ /**
+ * This is alternate way of getting partition information. It first fetches all partitions from
+ * hive and then apply filter instead of querying hive along with filters.
+ * @param partitionFilters
+ * @param sparkSession
+ * @param identifier
+ * @return
+ */
+ def getPartitionsAlternate(
+ partitionFilters: Seq[Expression],
+ sparkSession: SparkSession,
+ identifier: TableIdentifier) = {
+ val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
+ val partitionSchema = catalogTable.partitionSchema
+ if (partitionFilters.nonEmpty) {
+ val boundPredicate =
+ InterpretedPredicate.create(partitionFilters.reduce(And).transform {
+ case att: AttributeReference =>
+ val index = partitionSchema.indexWhere(_.name == att.name)
+ BoundReference(index, partitionSchema(index).dataType, nullable = true)
+ })
+ allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
+ } else {
+ allPartitions
+ }
+ }
+
+ /**
+ * Update the storageformat with new location information
+ */
+ override def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat = {
+ storage.copy(locationUri = Some(path.toString))
+ }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies = extraStrategies
+
+ experimentalMethods.extraOptimizations = extraOptimizations
+
+ def extraStrategies: Seq[Strategy] = {
+ Seq(
+ new StreamingTableStrategy(sparkSession),
+ new CarbonLateDecodeStrategy,
+ new DDLStrategy(sparkSession)
+ )
+ }
+
+ def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+ Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
+ }
+
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
+ def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ CarbonPreInsertionCasts(sparkSession) ::
+ CarbonIUDAnalysisRule(sparkSession) ::
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ (if (conf.runSQLonFile) {
+ new ResolveDataSource(sparkSession) :: Nil
+ } else { Nil })
+ }
+
+ override lazy val analyzer: Analyzer =
+ new CarbonAnalyzer(catalog, conf, sparkSession,
+ new Analyzer(catalog, conf) {
+ override val extendedResolutionRules =
+ if (extendedAnalyzerRules.nonEmpty) {
+ extendedAnalyzerRules ++ internalAnalyzerRules
+ } else {
+ internalAnalyzerRules
+ }
+ override val extendedCheckRules = Seq(
+ PreWriteCheck(conf, catalog))
+ }
+ )
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override lazy val catalog = {
+ new CarbonHiveSessionCatalog(
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ newHadoopConf())
+ }
+}
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+ conf: CatalystConf,
+ sparkSession: SparkSession,
+ analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+ val mvPlan = try {
+ CarbonReflectionUtils.createObject(
+ "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+ sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+ } catch {
+ case e: Exception =>
+ null
+ }
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ val logicalPlan = analyzer.execute(plan)
+ if (mvPlan != null) {
+ mvPlan.apply(logicalPlan)
+ } else {
+ logicalPlan
+ }
+ }
+}
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+ super.execute(transFormedPlan)
+ }
+}
+
+object CarbonOptimizerUtil {
+ def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
+ // And optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ case p: PredicateSubquery =>
+ val tPlan = p.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+ }
+ }
+ transFormedPlan
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("carbondata") ||
+ fileStorage.equalsIgnoreCase("'carbonfile'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
+ ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
+ Option(ctx.STRING()).map(string),
+ ctx.AS, ctx.query, fileStorage)
+ helper.createCarbonTable(createTableTuple)
+ } else {
+ super.visitCreateTable(ctx)
+ }
+ }
+
+ override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+ withOrigin(ctx) {
+ if (CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+ CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+ super.visitShowTables(ctx)
+ } else {
+ CarbonShowTablesCommand(
+ Option(ctx.db).map(_.getText),
+ Option(ctx.pattern).map(string))
+ }
+ }
+ }
+
+ override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
+ CarbonExplainCommand(super.visitExplain(ctx))
+ }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..dd690e4
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,165 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{
+ CatalogRelation, CatalogTable, CatalogTableType,
+ SimpleCatalogRelation
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{
+ AlterTableRecoverPartitionsCommand, DDLUtils,
+ RunnableCommand
+}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ * @param table the Catalog Table
+ * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+ table: CatalogTable,
+ mode: SaveMode,
+ query: LogicalPlan)
+ extends RunnableCommand {
+
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
+ assert(table.schema.isEmpty)
+
+ val provider = table.provider.get
+ val sessionState = sparkSession.sessionState
+ val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
+
+ var createMetastoreTable = false
+ var existingSchema = Option.empty[StructType]
+ if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+ // Check if we need to throw an exception or just return.
+ mode match {
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(s"Table $tableName already exists. " +
+ s"If you are using saveAsTable, you can set SaveMode to " +
+ s"SaveMode.Append to " +
+ s"insert data into the table or set SaveMode to SaveMode" +
+ s".Overwrite to overwrite" +
+ s"the existing data. " +
+ s"Or, if you are using SQL CREATE TABLE, you need to drop " +
+ s"$tableName first.")
+ case SaveMode.Ignore =>
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty[Row]
+ case SaveMode.Append =>
+ // Check if the specified data source match the data source of the existing table.
+ val existingProvider = DataSource.lookupDataSource(provider)
+ // TODO: Check that options from the resolved relation match the relation that we are
+ // inserting into (i.e. using the same compression).
+
+ // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+ // views unexpectedly.
+ EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+ case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+ // check if the file formats match
+ l.relation match {
+ case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
+ throw new AnalysisException(
+ s"The file format of the existing table $tableName is " +
+ s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
+ s"format `$provider`")
+ case _ =>
+ }
+ if (query.schema.size != l.schema.size) {
+ throw new AnalysisException(
+ s"The column number of the existing schema[${ l.schema }] " +
+ s"doesn't match the data schema[${ query.schema }]'s")
+ }
+ existingSchema = Some(l.schema)
+ case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+ existingSchema = Some(s.metadata.schema)
+ case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
+ throw new AnalysisException("Saving data in the Hive serde table " +
+ s"${ c.catalogTable.identifier } is not supported yet. " +
+ s"Please use the insertInto() API as an alternative..")
+ case o =>
+ throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
+ }
+ case SaveMode.Overwrite =>
+ sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+ // Need to create the table again.
+ createMetastoreTable = true
+ }
+ } else {
+ // The table does not exist. We need to create it in metastore.
+ createMetastoreTable = true
+ }
+
+ val data = Dataset.ofRows(sparkSession, query)
+ val df = existingSchema match {
+ // If we are inserting into an existing table, just use the existing schema.
+ case Some(s) => data.selectExpr(s.fieldNames: _*)
+ case None => data
+ }
+
+ val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+ Some(sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.locationUri
+ }
+
+ // Create the relation based on the data of df.
+ val pathOption = tableLocation.map("path" -> _)
+ val dataSource = DataSource(
+ sparkSession,
+ className = provider,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
+
+ val result = try {
+ dataSource.write(mode, df)
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table $tableName in $mode mode", ex)
+ throw ex
+ }
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
+
+ // Refresh the cache of the table in the catalog.
+ sessionState.catalog.refreshTable(tableIdentWithDB)
+ Seq.empty[Row]
+ }
+}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..446b5a5
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+ metadata: Metadata,exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier,attrRef.isGenerated)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr: Option[NamedExpression] = None): Alias = {
+ val isGenerated:Boolean = if (namedExpr.isDefined) {
+ namedExpr.get.isGenerated
+ } else {
+ false
+ }
+ Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation)
+ }
+
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq(OptimizeCodegen(conf))
+ }
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+ }
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
similarity index 95%
rename from integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
rename to integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 808099d..2f9ad3e 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -23,14 +23,15 @@
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.internal.SQLConf
+
class CarbonOptimizer(
catalog: SessionCatalog,
conf: SQLConf,
experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, experimentalMethods) {
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
override def execute(plan: LogicalPlan): LogicalPlan = {
val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
super.execute(transFormedPlan)
}
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 89%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
copy to integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 36ec2c8..7177f1a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -31,15 +31,15 @@
val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
- val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat(0))
+ val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
fileStorage.equalsIgnoreCase("carbondata") ||
fileStorage.equalsIgnoreCase("'carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
- ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
- ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+ ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+ Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
helper.createCarbonTable(createTableTuple)
} else {
super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
}
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
- visitAddTableColumns(parser, ctx)
+ visitAddTableColumns(parser,ctx)
}
}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index d134de1..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
- extends LeafExpression with NamedExpression with CodegenFallback {
-
- type EvaluatedType = Any
-
- override def toString: String = s"input[" + colExp.getColIndex + "]"
-
- override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
- override def name: String = colExp.getColumnName
-
- override def toAttribute: Attribute = throw new UnsupportedOperationException
-
- override def exprId: ExprId = throw new UnsupportedOperationException
-
- override def qualifier: Option[String] = null
-
- override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 7003c26..9094dfe 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -23,11 +23,11 @@
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, Metadata}
object CarbonToSparkAdapter {
@@ -41,8 +41,8 @@
}
def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
- metadata: Metadata, exprId: ExprId, qualifier: Option[String],
- attrRef : NamedExpression = null): AttributeReference = {
+ metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
AttributeReference(
name,
dataType,
@@ -50,23 +50,14 @@
metadata)(exprId, qualifier)
}
- def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
- ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
- }
-
- def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
- ): ExprCode = {
- ExprCode(code, isNull, value)
- }
-
def createAliasRef(child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Option[String] = None,
- explicitMetadata: Option[Metadata] = None,
- namedExpr : Option[NamedExpression] = None ) : Alias = {
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr : Option[NamedExpression] = None ) : Alias = {
- Alias(child, name)(exprId, qualifier, explicitMetadata)
+ Alias(child, name)(exprId, qualifier, explicitMetadata)
}
def getExplainCommandObj() : ExplainCommand = {
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
deleted file mode 100644
index bffb900..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.types.StructType
-
-object MixedFormatHandlerUtil {
-
- def getScanForSegments(
- @transient relation: HadoopFsRelation,
- output: Seq[Attribute],
- outputSchema: StructType,
- partitionFilters: Seq[Expression],
- dataFilters: Seq[Expression],
- tableIdentifier: Option[TableIdentifier]
- ): FileSourceScanExec = {
- FileSourceScanExec(
- relation,
- output,
- outputSchema,
- partitionFilters,
- dataFilters,
- tableIdentifier)
- }
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 25d5543..5435f04 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.execution.strategy
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 93%
rename from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
rename to integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 36ec2c8..73b6790 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -38,8 +38,8 @@
fileStorage.equalsIgnoreCase("'carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
- ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
- ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+ ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
+ Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
helper.createCarbonTable(createTableTuple)
} else {
super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
}
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
- visitAddTableColumns(parser, ctx)
+ visitAddTableColumns(parser,ctx)
}
}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
deleted file mode 100644
index 61959c2..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager}
-import org.apache.spark.sql.hive.client.HiveClient
-
-object SparkAdapter {
- def getExternalCatalogCatalog(catalog: HiveExternalCatalog) = catalog
-
- def getGlobalTempViewManager(manager: GlobalTempViewManager) = manager
-
- def getHiveClient(client: HiveClient) = client
-
- def getHiveExternalCatalog(catalog: ExternalCatalog) = catalog.asInstanceOf[HiveExternalCatalog]
-}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index ee4c9ce..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
- extends LeafExpression with NamedExpression with CodegenFallback {
-
- type EvaluatedType = Any
-
- override def toString: String = s"input[" + colExp.getColIndex + "]"
-
- override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
- override def name: String = colExp.getColumnName
-
- override def toAttribute: Attribute = throw new UnsupportedOperationException
-
- override def exprId: ExprId = throw new UnsupportedOperationException
-
- override def qualifier: Seq[String] = null
-
- override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
deleted file mode 100644
index 082d1ec..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
- def addSparkListener(sparkContext: SparkContext) = {
- sparkContext.addSparkListener(new SparkListener {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- SparkSession.setDefaultSession(null)
- }
- })
- }
-
- def createAttributeReference(
- name: String,
- dataType: DataType,
- nullable: Boolean,
- metadata: Metadata,
- exprId: ExprId,
- qualifier: Option[String],
- attrRef : NamedExpression = null): AttributeReference = {
- val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
- AttributeReference(
- name,
- dataType,
- nullable,
- metadata)(exprId, qf)
- }
-
- def createAttributeReference(
- name: String,
- dataType: DataType,
- nullable: Boolean,
- metadata: Metadata,
- exprId: ExprId,
- qualifier: Seq[String]): AttributeReference = {
- AttributeReference(
- name,
- dataType,
- nullable,
- metadata)(exprId, qualifier)
- }
-
- def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
- ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
- }
-
- def createExprCode(code: String, isNull: String, value: String, dataType: DataType) = {
- ExprCode(
- code"$code",
- JavaCode.isNullVariable(isNull),
- JavaCode.variable(value, dataType))
- }
-
- def createAliasRef(
- child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Seq[String] = Seq.empty,
- explicitMetadata: Option[Metadata] = None,
- namedExpr: Option[NamedExpression] = None) : Alias = {
- Alias(child, name)(exprId, qualifier, explicitMetadata)
- }
-
- def createAliasRef(
- child: Expression,
- name: String,
- exprId: ExprId,
- qualifier: Option[String]) : Alias = {
- Alias(child, name)(exprId,
- if (qualifier.isEmpty) Seq.empty else Seq(qualifier.get),
- None)
- }
-
- def getExplainCommandObj() : ExplainCommand = {
- ExplainCommand(OneRowRelation())
- }
-
- /**
- * As a part of SPARK-24085 Hive tables supports scala subquery for
- * parition tables, so Carbon also needs to supports
- * @param partitionSet
- * @param filterPredicates
- * @return
- */
- def getPartitionKeyFilter(
- partitionSet: AttributeSet,
- filterPredicates: Seq[Expression]): ExpressionSet = {
- ExpressionSet(
- ExpressionSet(filterPredicates)
- .filterNot(SubqueryExpression.hasSubquery)
- .filter(_.references.subsetOf(partitionSet)))
- }
-
- // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
- def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
- Seq.empty
- }
-
- def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
- map: Map[String, String],
- tablePath: String): CatalogStorageFormat = {
- storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
- }
-}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
deleted file mode 100644
index 8132188..0000000
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener, GlobalTempViewManager}
-import org.apache.spark.sql.hive.client.HiveClient
-
-object SparkAdapter {
- def getExternalCatalogCatalog(catalog: HiveExternalCatalog) =
- () => catalog
-
- def getGlobalTempViewManager(manager: GlobalTempViewManager) =
- () => manager
-
- def getHiveClient(client: HiveClient) =
- () => client
-
- def getHiveExternalCatalog(catalog: ExternalCatalogWithListener) =
- catalog.unwrapped.asInstanceOf[HiveExternalCatalog]
-}
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index e7ca31b..c1a5862 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -24,7 +24,6 @@
val dataMapName = "bloom_dm"
override protected def beforeAll(): Unit = {
- sqlContext.sparkContext.setLogLevel("info")
deleteFile(bigFile)
new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
createFile(bigFile, line = 2000)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
diff --git a/pom.xml b/pom.xml
index f74a11b..66214df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -487,7 +487,7 @@
<profiles>
<profile>
- <!--This profile does not build spark module, so user should explicitly give spark profile -->
+ <!--This profile does not build spark module, so user should explicitly give spark profile also like spark-2.1 -->
<id>build-with-format</id>
<modules>
<module>format</module>
@@ -501,6 +501,103 @@
</properties>
</profile>
<profile>
+ <id>spark-2.1</id>
+ <properties>
+ <spark.version>2.1.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.eluder.coveralls</groupId>
+ <artifactId>coveralls-maven-plugin</artifactId>
+ <version>4.3.0</version>
+ <configuration>
+ <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+ <sourceEncoding>UTF-8</sourceEncoding>
+ <jacocoReports>
+ <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+ </jacocoReport>
+ </jacocoReports>
+ <sourceDirectories>
+ <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+ </sourceDirectories>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.2</id>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.eluder.coveralls</groupId>
+ <artifactId>coveralls-maven-plugin</artifactId>
+ <version>4.3.0</version>
+ <configuration>
+ <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+ <sourceEncoding>UTF-8</sourceEncoding>
+ <jacocoReports>
+ <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+ </jacocoReport>
+ </jacocoReports>
+ <sourceDirectories>
+ <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+ </sourceDirectories>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
@@ -529,64 +626,11 @@
<sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/below2.4</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/below2.4</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
- </sourceDirectories>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>spark-2.4</id>
- <properties>
- <spark.version>2.4.4</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.eluder.coveralls</groupId>
- <artifactId>coveralls-maven-plugin</artifactId>
- <version>4.3.0</version>
- <configuration>
- <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
- <sourceEncoding>UTF-8</sourceEncoding>
- <jacocoReports>
- <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
- </jacocoReport>
- </jacocoReports>
- <sourceDirectories>
- <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.4</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.4</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>