[CARBONDATA-3514] Support Spark 2.4.4 integration
This PR adds integration with Spark 2.4.4
This closes #3378
diff --git a/README.md b/README.md
index a34784d..2f661ed 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,8 @@
## Status
-Spark2.2:
-[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.2)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
+Spark2.3:
+[![Build Status](https://builds.apache.org/buildStatus/icon?job=carbondata-master-spark-2.3)](https://builds.apache.org/view/A-D/view/CarbonData/job/carbondata-master-spark-2.2/lastBuild/testReport)
[![Coverage Status](https://coveralls.io/repos/github/apache/carbondata/badge.svg?branch=master)](https://coveralls.io/github/apache/carbondata?branch=master)
<a href="https://scan.coverity.com/projects/carbondata">
<img alt="Coverity Scan Build Status"
diff --git a/build/README.md b/build/README.md
index f361a6e..960ccce 100644
--- a/build/README.md
+++ b/build/README.md
@@ -25,11 +25,9 @@
* [Apache Thrift 0.9.3](http://archive.apache.org/dist/thrift/0.9.3/)
## Build command
-Build with different supported versions of Spark, by default using Spark 2.2.1 to build
+Build with different supported versions of Spark, by default using Spark 2.4.4
```
-mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package
-mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 clean package
-mvn -DskipTests -Pspark-2.3 -Dspark.version=2.3.2 clean package
+mvn -DskipTests -Pspark-2.4 clean package
```
Note:
@@ -39,5 +37,5 @@
## For contributors : To build the format code after any changes, please follow the below command.
Note:Need install Apache Thrift 0.9.3
```
-mvn clean -DskipTests -Pbuild-with-format -Pspark-2.2 package
+mvn clean -DskipTests -Pbuild-with-format -Pspark-2.4 package
```
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index d0e5a42..c18298d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -27,11 +27,11 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -42,12 +42,15 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
/**
* Stores datamap schema in disk as json format
*/
public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStorageProvider {
+ private Logger LOG = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
private String storePath;
private String mdtFilePath;
@@ -171,17 +174,15 @@
if (!FileFactory.isFileExist(schemaPath)) {
throw new IOException("DataMap with name " + dataMapName + " does not exists in storage");
}
- Iterator<DataMapSchema> iterator = dataMapSchemas.iterator();
- while (iterator.hasNext()) {
- DataMapSchema schema = iterator.next();
- if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) {
- iterator.remove();
- }
- }
+
+ LOG.info(String.format("Trying to delete DataMap %s schema", dataMapName));
+
+ dataMapSchemas.removeIf(schema -> schema.getDataMapName().equalsIgnoreCase(dataMapName));
touchMDTFile();
if (!FileFactory.deleteFile(schemaPath)) {
throw new IOException("DataMap with name " + dataMapName + " cannot be deleted");
}
+ LOG.info(String.format("DataMap %s schema is deleted", dataMapName));
}
private void checkAndReloadDataMapSchemas(boolean touchFile) throws IOException {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index b32367b..08841b6 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -373,7 +373,8 @@
def updateColumnName(attr: Attribute, counter: Int): String = {
val name = getUpdatedName(attr.name, counter)
- attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
+ val value = attr.qualifier.map(qualifier => qualifier + "_" + name)
+ if (value.nonEmpty) value.head else name
}
def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
@@ -473,7 +474,7 @@
}
def createAttrReference(ref: NamedExpression, name: String): Alias = {
- Alias(ref, name)(exprId = ref.exprId, qualifier = None)
+ CarbonToSparkAdapter.createAliasRef(ref, name, exprId = ref.exprId)
}
case class AttributeKey(exp: Expression) {
@@ -537,13 +538,13 @@
case attr: AttributeReference =>
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
- CarbonToSparkAdapter.createAttributeReference(a.name,
- a.dataType,
- a.nullable,
- a.metadata,
- a.exprId,
- attr.qualifier,
- a)
+ CarbonToSparkAdapter.createAttributeReference(
+ name = a.name,
+ dataType = a.dataType,
+ nullable = a.nullable,
+ metadata = a.metadata,
+ exprId = a.exprId,
+ qualifier = attr.qualifier)
} else {
a
}
@@ -575,9 +576,9 @@
outputSel.zip(subsumerOutputList).map{ case (l, r) =>
l match {
case attr: AttributeReference =>
- Alias(attr, r.name)(r.exprId, None)
+ CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
case a@Alias(attr: AttributeReference, name) =>
- Alias(attr, r.name)(r.exprId, None)
+ CarbonToSparkAdapter.createAliasRef(attr, r.name, r.exprId)
case other => other
}
}
@@ -594,13 +595,13 @@
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
CarbonToSparkAdapter
- .createAttributeReference(a.name,
+ .createAttributeReference(
+ a.name,
a.dataType,
a.nullable,
a.metadata,
a.exprId,
- attr.qualifier,
- a)
+ attr.qualifier)
} else {
a
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 3ddb0fc..cff5c41 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -125,17 +125,18 @@
arrayBuffer += relation
}
var qualifier: Option[String] = None
- if (attr.qualifier.isDefined) {
- qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
+ if (attr.qualifier.nonEmpty) {
+ qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
Some(carbonTable.getTableName)
} else {
- attr.qualifier
+ attr.qualifier.headOption
}
}
fieldToDataMapFieldMap +=
- getFieldToDataMapFields(attr.name,
+ getFieldToDataMapFields(
+ attr.name,
attr.dataType,
- qualifier,
+ qualifier.headOption,
"",
arrayBuffer,
carbonTable.getTableName)
@@ -248,7 +249,8 @@
/**
* Below method will be used to get the fields object for mv table
*/
- private def getFieldToDataMapFields(name: String,
+ private def getFieldToDataMapFields(
+ name: String,
dataType: DataType,
qualifier: Option[String],
aggregateType: String,
@@ -313,7 +315,7 @@
val updatedOutList = outputList.map { col =>
val duplicateColumn = duplicateNameCols
.find(a => a.semanticEquals(col))
- val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+ val qualifiedName = col.qualifier.headOption.getOrElse(s"${ col.exprId.id }") + "_" + col.name
if (duplicateColumn.isDefined) {
val attributesOfDuplicateCol = duplicateColumn.get.collect {
case a: AttributeReference => a
@@ -329,7 +331,7 @@
attributeOfCol.exists(a => a.semanticEquals(expr)))
if (!isStrictDuplicate) {
Alias(col, qualifiedName)(exprId = col.exprId)
- } else if (col.qualifier.isDefined) {
+ } else if (col.qualifier.nonEmpty) {
Alias(col, qualifiedName)(exprId = col.exprId)
// this check is added in scenario where the column is direct Attribute reference and
// since duplicate columns select is allowed, we should just put alias for those columns
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 7e8eb96..6fbc87f 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -18,11 +18,12 @@
package org.apache.carbondata.mv.rewrite
+import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, Metadata}
import org.apache.carbondata.mv.datamap.MVHelper
import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
@@ -95,9 +96,12 @@
// Replace all compensation1 attributes with refrences of subsumer attributeset
val compensationFinal = compensation1.transformExpressions {
case ref: Attribute if subqueryAttributeSet.contains(ref) =>
- AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+ exprId = ref.exprId, qualifier = subsumerName)
case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
- Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+ CarbonToSparkAdapter.createAliasRef(
+ alias.child, alias.name, alias.exprId, subsumerName)
}
compensationFinal
} else {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index cb2043e..2b4247e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -17,10 +17,11 @@
package org.apache.carbondata.mv.plans.modular
+import org.apache.spark.sql.{CarbonToSparkAdapter, SQLConf}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.types.Metadata
import org.apache.carbondata.mv.plans
import org.apache.carbondata.mv.plans._
@@ -198,18 +199,18 @@
.isInstanceOf[Attribute]))
val aggOutputList = aggTransMap.values.flatMap(t => t._2)
.map { ref =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(hFactName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(hFactName))
}
val hFactOutputSet = hFact.outputSet
// Update the outputlist qualifier
val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
attr.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(hFactName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(hFactName))
}
}.asInstanceOf[Seq[NamedExpression]]
@@ -217,9 +218,9 @@
val hPredList = s.predicateList.map{ pred =>
pred.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(hFactName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(hFactName))
}
}
val hSel = s.copy(
@@ -241,9 +242,9 @@
val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
wip.transformExpressions {
case ref: Attribute if hFactOutputSet.contains(ref) =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(hFactName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(hFactName))
}
}
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index 30857c8..b694e78 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -52,9 +52,11 @@
plan transform {
case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
- val makeupmap = children.zipWithIndex.flatMap {
+ val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap {
case (child, i) =>
- aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+ aq.find(child.outputSet.contains(_))
+ .flatMap(_.qualifier.headOption)
+ .map((i, _))
}.toMap
g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 0bbacc4..7068b7e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -110,10 +110,7 @@
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
SparkSQLUtil.getRemoveRedundantAliasesObj(),
- RemoveRedundantProject,
- SimplifyCreateStructOps,
- SimplifyCreateArrayOps,
- SimplifyCreateMapOps) ++
+ RemoveRedundantProject) ++
extendedOperatorOptimizationRules: _*) ::
Batch(
"Check Cartesian Products", Once,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index 3b6c725..2033342 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -167,7 +167,9 @@
val aq = attributeSet.filter(_.qualifier.nonEmpty)
children.zipWithIndex.flatMap {
case (child, i) =>
- aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+ aq.find(child.outputSet.contains(_))
+ .flatMap(_.qualifier.headOption)
+ .map((i, _))
}.toMap
}
@@ -353,28 +355,13 @@
Seq.empty)
case l: LogicalRelation =>
val tableIdentifier = l.catalogTable.map(_.identifier)
- val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
- val table = tableIdentifier.map(_.table).getOrElse(null)
+ val database = tableIdentifier.flatMap(_.database).orNull
+ val table = tableIdentifier.map(_.table).orNull
Some(database, table, l.output, Nil, NoFlags, Seq.empty)
case l: LocalRelation => // used for unit test
Some(null, null, l.output, Nil, NoFlags, Seq.empty)
case _ =>
- // this check is added as we get MetastoreRelation in spark2.1,
- // this is removed in later spark version
- // TODO: this check can be removed once 2.1 support is removed from carbon
- if (SparkUtil.isSparkVersionEqualTo("2.1") &&
- plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) {
- val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan)
- .asInstanceOf[CatalogTable]
- Some(catalogTable.database,
- catalogTable.identifier.table,
- plan.output,
- Nil,
- NoFlags,
- Seq.empty)
- } else {
None
- }
}
}
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index d3ce38d..366284b 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -204,7 +204,7 @@
s.child match {
case a: Alias =>
val qualifierPrefix = a.qualifier
- .map(_ + ".").getOrElse("")
+ .map(_ + ".").headOption.getOrElse("")
s"$qualifierPrefix${
quoteIdentifier(a
.name)
@@ -221,7 +221,7 @@
s.child match {
case a: Alias =>
val qualifierPrefix = a.qualifier.map(_ + ".")
- .getOrElse("")
+ .headOption.getOrElse("")
s"$qualifierPrefix${ quoteIdentifier(a.name) }"
case other => other.sql
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index b17eea2..c3a3a68 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -21,8 +21,11 @@
import scala.collection.immutable
+import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.types.Metadata
+import org.apache.spark.util.SparkUtil
import org.apache.carbondata.mv.expressions.modular._
import org.apache.carbondata.mv.plans._
@@ -116,18 +119,19 @@
if (i > -1) {
// this is a walk around for mystery of spark qualifier
if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
- AttributeReference(
- ref.name,
- ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
+ exprId = ref.exprId, qualifier = Some(aliasMap(i)))
} else {
ref
}
} else {
attrMap.get(ref) match {
case Some(alias) =>
- AttributeReference(
+ CarbonToSparkAdapter.createAttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
- ref.dataType)(exprId = ref.exprId,
+ ref.dataType, nullable = true, metadata = Metadata.empty,
+ exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
@@ -178,13 +182,12 @@
list = list :+ ((index, subqueryName))
newS = newS.transformExpressions {
case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(subqueryName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
- Alias(alias.child, alias.name)(
- exprId = alias.exprId,
- qualifier = Some(subqueryName))
+ CarbonToSparkAdapter.createAliasRef(
+ alias.child, alias.name, alias.exprId, Some(subqueryName))
}
case _ =>
@@ -212,13 +215,12 @@
}
newG.transformExpressions {
case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
- AttributeReference(ref.name, ref.dataType)(
- exprId = ref.exprId,
- qualifier = Some(subqueryName))
+ CarbonToSparkAdapter.createAttributeReference(
+ ref.name, ref.dataType, nullable = true, Metadata.empty,
+ ref.exprId, Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
- Alias(alias.child, alias.name)(
- exprId = alias.exprId,
- qualifier = Some(subqueryName))
+ CarbonToSparkAdapter.createAliasRef(
+ alias.child, alias.name, alias.exprId, Some(subqueryName))
}.copy(alias = Some(subqueryName))
}
}
diff --git a/docs/alluxio-guide.md b/docs/alluxio-guide.md
index b1bfeeb..bad1fc0 100644
--- a/docs/alluxio-guide.md
+++ b/docs/alluxio-guide.md
@@ -50,7 +50,7 @@
### Running spark-shell
- Running the command in spark path
```$command
-./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
+./bin/spark-shell --jars ${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar,${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar
```
- Testing use alluxio by CarbonSession
```$scala
@@ -98,7 +98,7 @@
--master local \
--jars ${ALLUXIO_PATH}/client/alluxio-1.8.1-client.jar,${CARBONDATA_PATH}/examples/spark2/target/carbondata-examples-1.6.0-SNAPSHOT.jar \
--class org.apache.carbondata.examples.AlluxioExample \
-${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.2.1-hadoop2.7.2.jar \
+${CARBONDATA_PATH}/assembly/target/scala-2.11/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.4-hadoop2.7.2.jar \
false
```
**NOTE**: Please set runShell as false, which can avoid dependency on alluxio shell module.
diff --git a/docs/faq.md b/docs/faq.md
index 16cdfa5..88ca186 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -316,7 +316,7 @@
2. Use the following command :
```
- mvn -Pspark-2.1 -Dspark.version {yourSparkVersion} clean package
+ mvn -Pspark-2.4 -Dspark.version {yourSparkVersion} clean package
```
Note : Refrain from using "mvn clean package" without specifying the profile.
diff --git a/docs/presto-guide.md b/docs/presto-guide.md
index 483585f..0c49f35 100644
--- a/docs/presto-guide.md
+++ b/docs/presto-guide.md
@@ -237,9 +237,9 @@
$ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package
```
Replace the spark and hadoop version with the version used in your cluster.
- For example, if you are using Spark 2.2.1 and Hadoop 2.7.2, you would like to compile using:
+ For example, if you are using Spark 2.4.4, you would like to compile using:
```
- mvn -DskipTests -Pspark-2.2 -Dspark.version=2.2.1 -Dhadoop.version=2.7.2 clean package
+ mvn -DskipTests -Pspark-2.4 -Dspark.version=2.4.4 -Dhadoop.version=2.7.2 clean package
```
Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index b66fce4..d007e03 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -37,11 +37,11 @@
- [CLOSE STREAM](#close-stream)
## Quick example
-Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
+Download and unzip spark-2.4.4-bin-hadoop2.7.tgz, and export $SPARK_HOME
-Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
+Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.6.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
```shell
-mvn clean package -DskipTests -Pspark-2.2
+mvn clean package -DskipTests -Pspark-2.4
```
Start a socket data server in a terminal
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index b6921f2..d5c1188 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,9 +37,9 @@
s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+ .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
- spark.sparkContext.setLogLevel("INFO")
+ spark.sparkContext.setLogLevel("error")
exampleBody(spark)
spark.close()
}
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
index 73bf941..de3b5bc 100644
--- a/integration/flink/pom.xml
+++ b/integration/flink/pom.xml
@@ -184,26 +184,6 @@
<profiles>
<profile>
- <id>spark-2.2</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark2</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- </profile>
- <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 199ff84..0790763 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -191,4 +191,103 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>build-all</id>
+ <properties>
+ <spark.version>2.3.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>sdvtest</id>
+ <properties>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-2.3</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>2.3.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.4</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.3</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.4</id>
+ <properties>
+ <spark.version>2.4.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.3</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.4</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index f629260..2548110 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,11 +37,12 @@
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.adapter.CarbonToSparkAdapter
import org.apache.carbondata.spark.util.CommonUtil
object CsvRDDHelper {
@@ -93,9 +94,9 @@
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
- FilePartition(
+ CarbonToSparkAdapter.createFilePartition(
partitions.size,
- currentFiles.toArray.toSeq)
+ currentFiles)
partitions += newPartition
}
currentFiles.clear()
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 13e7c45..a46568a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,23 +17,22 @@
package org.apache.spark.sql.util
-import java.lang.reflect.Method
-
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EmptyRule
+import org.apache.spark.sql.catalyst.analysis.EliminateView
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.optimizer.{CheckCartesianProducts, EliminateOuterJoin, NullPropagation, PullupCorrelatedPredicates, RemoveRedundantAliases, ReorderJoin}
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
+import org.apache.spark.util.SerializableConfiguration
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -51,166 +50,60 @@
}
def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
- method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val method: Method = logicalPlanObj.getClass.getMethod("stats")
- method.invoke(logicalPlanObj).asInstanceOf[Statistics]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ logicalPlanObj.stats
}
def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
: NamedExpression = {
- if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
- clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
- invoke(null, r, input).asInstanceOf[NamedExpression]
- } else {
- r
- }
+ QueryPlan.normalizeExprId(r, input)
}
def getStatisticsObj(outputList: Seq[NamedExpression],
plan: LogicalPlan, stats: Statistics,
aliasMap: Option[AttributeMap[Attribute]] = None)
: Statistics = {
- val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
- if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val output = outputList.map(_.toAttribute)
- val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
- table => AttributeMap(table.output.zip(output))
- }
- val rewrites = mapSeq.head
- val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
- getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
- var attributeStats = AttributeMap(attributes.iterator
- .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
- if (aliasMap.isDefined) {
- attributeStats = AttributeMap(
- attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
- }
- val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
- CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
- stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
- } else {
- val output = outputList.map(_.name)
- val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
- table => table.output.map(_.name).zip(output).toMap
- }
- val rewrites = mapSeq.head
- val colStats = CarbonReflectionUtils.getField("colStats", stats)
- .asInstanceOf[Map[String, ColumnStat]]
- var attributeStats = colStats.iterator
- .map { pair => (rewrites(pair._1), pair._2) }.toMap
- if (aliasMap.isDefined) {
- val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
- attributeStats =
- attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
- , pair._2))
- }
- CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
- stats.rowCount, attributeStats).asInstanceOf[Statistics]
+ val output = outputList.map(_.toAttribute)
+ val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+ table => AttributeMap(table.output.zip(output))
}
+ val rewrites = mapSeq.head
+ val attributes : AttributeMap[ColumnStat] = stats.attributeStats
+ var attributeStats = AttributeMap(attributes.iterator
+ .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+ if (aliasMap.isDefined) {
+ attributeStats = AttributeMap(
+ attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+ }
+ val hints = stats.hints
+ Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, hints)
}
def getEliminateViewObj(): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
- CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
- } else {
- EmptyRule
- }
+ EliminateView
}
def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
- CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
- } else {
- EmptyRule
- }
+ PullupCorrelatedPredicates
}
def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
- CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
- } else {
- EmptyRule
- }
+ RemoveRedundantAliases
}
def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
- CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- }
- else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ ReorderJoin
}
def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
- CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- }
- else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ EliminateOuterJoin
}
def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
- CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ NullPropagation
}
def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
- CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
- CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
- .asInstanceOf[Rule[LogicalPlan]]
- } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
- CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
- }
- else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ CheckCartesianProducts
}
/**
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 46692df..93e66ea 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,24 +17,22 @@
package org.apache.spark.util
-import java.lang.reflect.Method
-
import scala.reflect.runtime._
import scala.reflect.runtime.universe._
-import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -60,45 +58,19 @@
def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
im.symbol.typeSignature.members.find(_.name.toString.equals(name))
- .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+ .map(l => im.reflectField(l.asTerm).get).orNull
}
def getUnresolvedRelation(
tableIdentifier: TableIdentifier,
tableAlias: Option[String] = None): UnresolvedRelation = {
- val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- createObject(
- className,
- tableIdentifier,
- tableAlias)._1.asInstanceOf[UnresolvedRelation]
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- createObject(
- className,
- tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
- } else {
- throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
- }
+ UnresolvedRelation(tableIdentifier)
}
def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
relation: LogicalPlan,
view: Option[TableIdentifier]): SubqueryAlias = {
- val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- createObject(
- className,
- alias.getOrElse(""),
- relation,
- Option(view))._1.asInstanceOf[SubqueryAlias]
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- createObject(
- className,
- alias.getOrElse(""),
- relation)._1.asInstanceOf[SubqueryAlias]
- } else {
- throw new UnsupportedOperationException("Unsupported Spark version")
- }
+ SubqueryAlias(alias.getOrElse(""), relation)
}
def getInsertIntoCommand(table: LogicalPlan,
@@ -106,58 +78,23 @@
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean): InsertIntoTable = {
- val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val overwriteOptions = createObject(
- "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
- overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
- createObject(
- className,
- table,
- partition,
- query,
- overwriteOptions,
- ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
- } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
- createObject(
- className,
- table,
- partition,
- query,
- overwrite.asInstanceOf[Object],
- ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
- } else {
- throw new UnsupportedOperationException("Unsupported Spark version")
- }
+ InsertIntoTable(
+ table,
+ partition,
+ query,
+ overwrite,
+ ifPartitionNotExists)
}
def getLogicalRelation(relation: BaseRelation,
expectedOutputAttributes: Seq[Attribute],
catalogTable: Option[CatalogTable],
isStreaming: Boolean): LogicalRelation = {
- val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- createObject(
- className,
- relation,
- Some(expectedOutputAttributes),
- catalogTable)._1.asInstanceOf[LogicalRelation]
- } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- createObject(
- className,
- relation,
- expectedOutputAttributes,
- catalogTable)._1.asInstanceOf[LogicalRelation]
- } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
- createObject(
- className,
- relation,
- expectedOutputAttributes,
- catalogTable,
- isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
- } else {
- throw new UnsupportedOperationException("Unsupported Spark version")
- }
+ new LogicalRelation(
+ relation,
+ expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
+ catalogTable,
+ isStreaming)
}
@@ -208,46 +145,28 @@
def getSessionState(sparkContext: SparkContext,
carbonSession: Object,
useHiveMetaStore: Boolean): Any = {
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (useHiveMetaStore) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonSessionState")
- createObject(className, carbonSession)._1
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- if (useHiveMetaStore) {
- val className = sparkContext.conf.get(
- CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
- val tuple = createObject(className, carbonSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
- } else {
- val className = sparkContext.conf.get(
- CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
- val tuple = createObject(className, carbonSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
- }
+ "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
} else {
- throw new UnsupportedOperationException("Spark version not supported")
+ val className = sparkContext.conf.get(
+ CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+ "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
}
}
def hasPredicateSubquery(filterExp: Expression) : Boolean = {
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
- val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
- val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
- hasSubquery
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
- val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
- val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
- hasSubquery
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
+ val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
+ val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
+ hasSubquery
}
def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
@@ -265,19 +184,10 @@
rdd: RDD[InternalRow],
partition: Partitioning,
metadata: Map[String, String]): RowDataSourceScanExec = {
- val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
- if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
- createObject(className, output, rdd, relation.relation,
- partition, metadata,
- relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
- } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- createObject(className, output, output.map(output.indexOf),
- pushedFilters.toSet, handledFilters.toSet, rdd,
- relation.relation,
- relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ RowDataSourceScanExec(output, output.map(output.indexOf),
+ pushedFilters.toSet, handledFilters.toSet, rdd,
+ relation.relation,
+ relation.catalogTable.map(_.identifier))
}
def invokewriteAndReadMethod(dataSourceObj: DataSource,
@@ -287,25 +197,7 @@
mode: SaveMode,
query: LogicalPlan,
physicalPlan: SparkPlan): BaseRelation = {
- if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- val method: Method = dataSourceObj.getClass
- .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
- method.invoke(dataSourceObj, mode, dataFrame)
- .asInstanceOf[BaseRelation]
- } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
- val method: Method = dataSourceObj.getClass
- .getMethod("writeAndRead",
- classOf[SaveMode],
- classOf[LogicalPlan],
- classOf[Seq[String]],
- classOf[SparkPlan])
- // since spark 2.3.2 version (SPARK-PR#22346),
- // change 'query.output' to 'query.output.map(_.name)'
- method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
- .asInstanceOf[BaseRelation]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
}
/**
@@ -316,9 +208,7 @@
*/
def invokeAlterTableAddColumn(table: TableIdentifier,
colsToAdd: Seq[StructField]): Object = {
- val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
- CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
- ._1.asInstanceOf[RunnableCommand]
+ AlterTableAddColumnsCommand(table, colsToAdd)
}
def createSingleObject(className: String): Any = {
@@ -385,16 +275,6 @@
def invokeAnalyzerExecute(analyzer: Analyzer,
plan: LogicalPlan): LogicalPlan = {
- if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
- val method: Method = analyzer.getClass
- .getMethod("execute", classOf[LogicalPlan])
- method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
- } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
- val method: Method = analyzer.getClass
- .getMethod("executeAndCheck", classOf[LogicalPlan])
- method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
+ analyzer.executeAndCheck(plan)
}
}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
similarity index 65%
rename from integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
rename to integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
index 4abf189..a2ca9e6 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
+++ b/integration/spark-common/src/main/spark2.3/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.carbondata.spark.adapter
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import scala.collection.mutable.ArrayBuffer
-/**
- * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
- * recognize a subquery as such, and it allows us to write subquery aware transformations.
- */
-case class Subquery(child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
\ No newline at end of file
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+
+object CarbonToSparkAdapter {
+ def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+ FilePartition(index, files.toArray.toSeq)
+ }
+}
diff --git a/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..be82907
--- /dev/null
+++ b/integration/spark-common/src/main/spark2.4/org/apache/carbondata/spark/adapter/CarbonToSparkAdapter.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.adapter
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+ def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+ FilePartition(index, files.toArray)
+ }
+
+ def createAttributeReference(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ metadata: Metadata,
+ exprId: ExprId,
+ qualifier: Option[String],
+ attrRef : NamedExpression = null): AttributeReference = {
+ val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qf)
+ }
+
+ def createAliasRef(
+ child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr : Option[NamedExpression] = None ) : Alias = {
+ Alias(child, name)(
+ exprId,
+ if (qualifier.nonEmpty) Seq(qualifier.get) else Seq(),
+ explicitMetadata)
+ }
+}
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index cda1954..1f1cac3 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -192,86 +192,6 @@
</properties>
</profile>
<profile>
- <id>spark-2.1</id>
- <properties>
- <spark.version>2.1.0</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.3plus</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.1andspark2.2</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>spark-2.2</id>
- <properties>
- <spark.version>2.2.1</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.3plus</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.1andspark2.2</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
@@ -286,30 +206,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.1andspark2.2</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.3plus</source>
- </sources>
- </configuration>
- </execution>
- </executions>
</plugin>
</plugins>
</build>
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonDictionaryWrapper.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/CarbonVectorProxy.java
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
similarity index 100%
rename from integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/ColumnVectorFactory.java
rename to integration/spark-datasource/src/main/scala/org/apache/spark/sql/ColumnVectorFactory.java
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
deleted file mode 100644
index 605df66..0000000
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.io.api.Binary;
-
-public class CarbonDictionaryWrapper extends Dictionary {
-
- private Binary[] binaries;
-
- CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
- super(encoding);
- binaries = new Binary[dictionary.getDictionarySize()];
- for (int i = 0; i < binaries.length; i++) {
- binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
- }
- }
-
- @Override
- public int getMaxId() {
- return binaries.length - 1;
- }
-
- @Override
- public Binary decodeToBinary(int id) {
- return binaries[id];
- }
-}
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 7d23d7c..0000000
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import java.lang.reflect.Field;
-import java.math.BigInteger;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
-
-import org.apache.parquet.column.Encoding;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
- private ColumnarBatch columnarBatch;
- private ColumnVectorProxy[] columnVectorProxies;
-
-
- private void updateColumnVectors() {
- try {
- Field field = columnarBatch.getClass().getDeclaredField("columns");
- field.setAccessible(true);
- field.set(columnarBatch, columnVectorProxies);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- *
- * @param memMode which represent the type onheap or offheap vector.
- * @param outputSchema, metadata related to current schema of table.
- * @param rowNum rows number for vector reading
- * @param useLazyLoad Whether to use lazy load while getting the data.
- */
- public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
- boolean useLazyLoad) {
- columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
- columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
- for (int i = 0; i < columnVectorProxies.length; i++) {
- if (useLazyLoad) {
- columnVectorProxies[i] =
- new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
- } else {
- columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
- }
- }
- updateColumnVectors();
- }
-
- public ColumnVectorProxy getColumnVector(int ordinal) {
- return columnVectorProxies[ordinal];
- }
-
- /**
- * Returns the number of rows for read, including filtered rows.
- */
- public int numRows() {
- return columnarBatch.capacity();
- }
-
- /**
- * This API will return a columnvector from a batch of column vector rows
- * based on the ordinal
- *
- * @param ordinal
- * @return
- */
- public ColumnVector column(int ordinal) {
- return columnarBatch.column(ordinal);
- }
-
- /**
- * Resets this column for writing. The currently stored values are no longer accessible.
- */
- public void reset() {
- for (int i = 0; i < columnarBatch.numCols(); i++) {
- ((ColumnVectorProxy) columnarBatch.column(i)).reset();
- }
- }
-
- public void resetDictionaryIds(int ordinal) {
- (((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public InternalRow getRow(int rowId) {
- return columnarBatch.getRow(rowId);
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public Object getColumnarBatch() {
- return columnarBatch;
- }
-
- /**
- * Called to close all the columns in this batch. It is not valid to access the data after
- * calling this. This must be called at the end to clean up memory allocations.
- */
- public void close() {
- columnarBatch.close();
- }
-
- /**
- * Sets the number of rows in this batch.
- */
- public void setNumRows(int numRows) {
- columnarBatch.setNumRows(numRows);
- }
-
- public DataType dataType(int ordinal) {
- return columnarBatch.column(ordinal).dataType();
- }
-
- public static class ColumnVectorProxy extends ColumnVector {
-
- private ColumnVector vector;
-
- public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
- super(capacity, columnVector.dataType(), mode);
- try {
- Field childColumns =
- columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
- childColumns.setAccessible(true);
- Object o = childColumns.get(columnVector);
- childColumns.set(this, o);
- Field resultArray =
- columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
- resultArray.setAccessible(true);
- Object o1 = resultArray.get(columnVector);
- resultArray.set(this, o1);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- vector = columnVector;
- }
-
- public void putRowToColumnBatch(int rowId, Object value) {
- org.apache.spark.sql.types.DataType t = dataType();
- if (null == value) {
- putNull(rowId);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes());
- } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong());
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong());
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length);
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- vector.getChildColumn(0).putInt(rowId, c.months);
- vector.getChildColumn(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value);
- }
- }
- }
-
- public void putBoolean(int rowId, boolean value) {
- vector.putBoolean(rowId, value);
- }
-
- public void putByte(int rowId, byte value) {
- vector.putByte(rowId, value);
- }
-
- public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
- vector.putBytes(rowId, count, src, srcIndex);
- }
-
- public void putShort(int rowId, short value) {
- vector.putShort(rowId, value);
- }
-
- public void putInt(int rowId, int value) {
- vector.putInt(rowId, value);
- }
-
- public void putFloat(int rowId, float value) {
- vector.putFloat(rowId, value);
- }
-
- public void putFloats(int rowId, int count, float[] src, int srcIndex) {
- vector.putFloats(rowId, count, src, srcIndex);
- }
-
- public void putLong(int rowId, long value) {
- vector.putLong(rowId, value);
- }
-
- public void putDouble(int rowId, double value) {
- vector.putDouble(rowId, value);
- }
-
- public void putInts(int rowId, int count, int value) {
- vector.putInts(rowId, count, value);
- }
-
- public void putInts(int rowId, int count, int[] src, int srcIndex) {
- vector.putInts(rowId, count, src, srcIndex);
- }
-
- public void putShorts(int rowId, int count, short value) {
- vector.putShorts(rowId, count, value);
- }
-
- public void putShorts(int rowId, int count, short[] src, int srcIndex) {
- vector.putShorts(rowId, count, src, srcIndex);
- }
-
- public void putLongs(int rowId, int count, long value) {
- vector.putLongs(rowId, count, value);
- }
-
- public void putLongs(int rowId, int count, long[] src, int srcIndex) {
- vector.putLongs(rowId, count, src, srcIndex);
- }
-
- public void putDoubles(int rowId, int count, double value) {
- vector.putDoubles(rowId, count, value);
- }
-
- public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
- vector.putDoubles(rowId, count, src, srcIndex);
- }
-
- public DataType dataType(int ordinal) {
- return vector.dataType();
- }
-
- public void putNotNull(int rowId) {
- vector.putNotNull(rowId);
- }
-
- public void putNotNulls(int rowId, int count) {
- vector.putNotNulls(rowId, count);
- }
-
- public void putDictionaryInt(int rowId, int value) {
- vector.getDictionaryIds().putInt(rowId, value);
- }
-
- public void setDictionary(CarbonDictionary dictionary) {
- if (null != dictionary) {
- CarbonDictionaryWrapper dictionaryWrapper =
- new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
- vector.setDictionary(dictionaryWrapper);
- this.dictionary = dictionaryWrapper;
- } else {
- this.dictionary = null;
- vector.setDictionary(null);
- }
- }
-
- public void putNull(int rowId) {
- vector.putNull(rowId);
- }
-
- public void putNulls(int rowId, int count) {
- vector.putNulls(rowId, count);
- }
-
- public boolean hasDictionary() {
- return vector.hasDictionary();
- }
-
- public ColumnVector reserveDictionaryIds(int capacity) {
- this.dictionaryIds = vector.reserveDictionaryIds(capacity);
- return dictionaryIds;
- }
-
- @Override
- public boolean isNullAt(int i) {
- return vector.isNullAt(i);
- }
-
- @Override
- public boolean getBoolean(int i) {
- return vector.getBoolean(i);
- }
-
- @Override
- public byte getByte(int i) {
- return vector.getByte(i);
- }
-
- @Override
- public short getShort(int i) {
- return vector.getShort(i);
- }
-
- @Override
- public int getInt(int i) {
- return vector.getInt(i);
- }
-
- @Override
- public long getLong(int i) {
- return vector.getLong(i);
- }
-
- @Override
- public float getFloat(int i) {
- return vector.getFloat(i);
- }
-
- @Override
- public double getDouble(int i) {
- return vector.getDouble(i);
- }
-
- @Override
- protected void reserveInternal(int capacity) {
- }
-
- @Override
- public void reserve(int requiredCapacity) {
- vector.reserve(requiredCapacity);
- }
-
- @Override
- public long nullsNativeAddress() {
- return vector.nullsNativeAddress();
- }
-
- @Override
- public long valuesNativeAddress() {
- return vector.valuesNativeAddress();
- }
-
- @Override
- public void putBooleans(int rowId, int count, boolean value) {
- vector.putBooleans(rowId, count, value);
- }
-
- @Override
- public void putBytes(int rowId, int count, byte value) {
- vector.putBytes(rowId, count, value);
- }
-
- @Override
- public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
- vector.putIntsLittleEndian(rowId, count, src, srcIndex);
- }
-
- @Override
- public int getDictId(int rowId) {
- return vector.getDictId(rowId);
- }
-
- @Override
- public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
- vector.putLongsLittleEndian(rowId, count, src, srcIndex);
- }
-
- @Override
- public void putFloats(int rowId, int count, float value) {
- vector.putFloats(rowId, count, value);
- }
-
- @Override
- public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
- vector.putFloats(rowId, count, src, srcIndex);
- }
-
- @Override
- public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
- vector.putDoubles(rowId, count, src, srcIndex);
- }
-
- @Override
- public void putArray(int rowId, int offset, int length) {
- vector.putArray(rowId, offset, length);
- }
-
- @Override
- public int getArrayLength(int rowId) {
- return vector.getArrayLength(rowId);
- }
-
- @Override
- public int getArrayOffset(int rowId) {
- return vector.getArrayOffset(rowId);
- }
-
- @Override
- public void loadBytes(Array array) {
- vector.loadBytes(array);
- }
-
- @Override
- public int putByteArray(int rowId, byte[] value, int offset, int count) {
- return vector.putByteArray(rowId, value, offset, count);
- }
-
- /**
- * It keeps all binary data of all rows to it.
- * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
- * and offset.
- */
- public void putAllByteArray(byte[] data, int offset, int length) {
- vector.arrayData().appendBytes(length, data, offset);
- }
-
- @Override
- public void close() {
- vector.close();
- }
-
- public void reset() {
- if (isConstant) {
- return;
- }
- vector.reset();
- }
-
- public void setLazyPage(LazyPageLoader lazyPage) {
- lazyPage.loadPage();
- }
-
- public ColumnVector getVector() {
- return vector;
- }
- }
-
- public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
-
- private ColumnVector vector;
-
- private LazyPageLoader pageLoad;
-
- private boolean isLoaded;
-
- public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
- super(columnVector, capacity, mode);
- vector = columnVector;
- }
-
- @Override
- public boolean isNullAt(int i) {
- checkPageLoaded();
- return vector.isNullAt(i);
- }
-
- @Override
- public boolean getBoolean(int i) {
- checkPageLoaded();
- return vector.getBoolean(i);
- }
-
- @Override
- public byte getByte(int i) {
- checkPageLoaded();
- return vector.getByte(i);
- }
-
- @Override
- public short getShort(int i) {
- checkPageLoaded();
- return vector.getShort(i);
- }
-
- @Override
- public int getInt(int i) {
- checkPageLoaded();
- return vector.getInt(i);
- }
-
- @Override
- public long getLong(int i) {
- checkPageLoaded();
- return vector.getLong(i);
- }
-
- @Override
- public float getFloat(int i) {
- checkPageLoaded();
- return vector.getFloat(i);
- }
-
- @Override
- public double getDouble(int i) {
- checkPageLoaded();
- return vector.getDouble(i);
- }
-
- @Override
- public int getArrayLength(int rowId) {
- checkPageLoaded();
- return vector.getArrayLength(rowId);
- }
-
- @Override
- public int getArrayOffset(int rowId) {
- checkPageLoaded();
- return vector.getArrayOffset(rowId);
- }
-
- private void checkPageLoaded() {
- if (!isLoaded) {
- if (pageLoad != null) {
- pageLoad.loadPage();
- }
- isLoaded = true;
- }
- }
-
- public void reset() {
- if (isConstant) {
- return;
- }
- isLoaded = false;
- vector.reset();
- }
-
- public void setLazyPage(LazyPageLoader lazyPage) {
- this.pageLoad = lazyPage;
- }
-
- }
-}
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 7b65e0b..dfebf6f 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -253,92 +253,6 @@
</properties>
</profile>
<profile>
- <id>spark-2.1</id>
- <properties>
- <spark.version>2.1.0</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.2</exclude>
- <exclude>src/main/spark2.3</exclude>
- <exclude>src/main/commonTo2.2And2.3</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.1</source>
- <source>src/main/commonTo2.1And2.2</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>spark-2.2</id>
- <properties>
- <spark.version>2.2.1</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/main/spark2.1</exclude>
- <exclude>src/main/spark2.3</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/spark2.2</source>
- <source>src/main/commonTo2.2And2.3</source>
- <source>src/main/commonTo2.1And2.2</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
@@ -355,9 +269,7 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
- <exclude>src/main/spark2.1</exclude>
- <exclude>src/main/spark2.2</exclude>
- <exclude>src/main/commonTo2.1And2.2</exclude>
+ <exclude>src/main/spark2.4</exclude>
</excludes>
</configuration>
</plugin>
@@ -375,7 +287,46 @@
<configuration>
<sources>
<source>src/main/spark2.3</source>
- <source>src/main/commonTo2.2And2.3</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.4</id>
+ <properties>
+ <spark.version>2.4.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.3</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.4</source>
</sources>
</configuration>
</execution>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index aa650e0..78d6a46 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -17,13 +17,8 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
case class CastExpr(expr: Expression) extends Filter {
override def references: Array[String] = null
@@ -33,26 +28,6 @@
override def references: Array[String] = null
}
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
- extends LeafExpression with NamedExpression with CodegenFallback {
-
- type EvaluatedType = Any
-
- override def toString: String = s"input[" + colExp.getColIndex + "]"
-
- override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
- override def name: String = colExp.getColumnName
-
- override def toAttribute: Attribute = throw new UnsupportedOperationException
-
- override def exprId: ExprId = throw new UnsupportedOperationException
-
- override def qualifier: Option[String] = null
-
- override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
-
case class CarbonEndsWith(expr: Expression) extends Filter {
override def references: Array[String] = null
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index e020a99..23c078a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -258,41 +258,67 @@
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
""".stripMargin
- ExprCode(code, s"$value.getIsNull()",
- s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((org.apache.spark.sql.types.Decimal)$value.getValue())",
+ expr.dataType)
} else {
getDictionaryColumnIds(index)._3.getDataType match {
case CarbonDataTypes.INT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((Integer)$value.getValue())",
+ expr.dataType)
case CarbonDataTypes.SHORT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((Short)$value.getValue())",
+ expr.dataType)
case CarbonDataTypes.DOUBLE => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((Double)$value.getValue())",
+ expr.dataType)
case CarbonDataTypes.LONG => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((Long)$value.getValue())",
+ expr.dataType)
case CarbonDataTypes.BOOLEAN => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((Boolean)$value.getValue())",
+ expr.dataType)
case _ => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
""".stripMargin
- ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
-
+ CarbonToSparkAdapter.createExprCode(
+ code,
+ s"$value.getIsNull()",
+ s"((UTF8String)$value.getValue())",
+ expr.dataType)
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 376d121..703df20 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,11 +40,9 @@
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
@@ -183,12 +181,9 @@
LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
"Now existing Schema will be overwritten with default properties")
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val identifier = AbsoluteTableIdentifier.from(
- CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
- dbName,
- tableName)
+ val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
val updatedParams = CarbonSource.updateAndCreateTable(
- identifier, dataSchema, sparkSession, metaStore, parameters, None)
+ dbName, tableName, tablePath, dataSchema, sparkSession, metaStore, parameters, None)
(CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -278,9 +273,10 @@
lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
def createTableInfoFromParams(
+ databaseName: String,
+ tableName: String,
parameters: Map[String, String],
dataSchema: StructType,
- identifier: AbsoluteTableIdentifier,
query: Option[LogicalPlan],
sparkSession: SparkSession): TableModel = {
val sqlParser = new CarbonSpark2SqlParser
@@ -301,8 +297,8 @@
sqlParser.getFields(dataSchema)
}
val bucketFields = sqlParser.getBucketFields(map, fields, options)
- sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
- identifier.getTableName, fields, Nil, map, bucketFields)
+ sqlParser.prepareTableModel(ifNotExistPresent = false, Option(databaseName),
+ tableName, fields, Nil, map, bucketFields)
}
/**
@@ -314,7 +310,8 @@
def updateCatalogTableWithCarbonSchema(
tableDesc: CatalogTable,
sparkSession: SparkSession,
- query: Option[LogicalPlan] = None): CatalogTable = {
+ query: Option[LogicalPlan] = None,
+ persistSchema: Boolean = true): CatalogTable = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val storageFormat = tableDesc.storage
val properties = storageFormat.properties
@@ -322,14 +319,16 @@
val tablePath = CarbonEnv.getTablePath(
tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
- val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
val map = updateAndCreateTable(
- identifier,
+ dbName,
+ tableDesc.identifier.table,
+ tablePath,
tableDesc.schema,
sparkSession,
metaStore,
properties,
- query)
+ query,
+ persistSchema)
// updating params
val updatedFormat = CarbonToSparkAdapter
.getUpdatedStorageFormat(storageFormat, map, tablePath)
@@ -351,36 +350,56 @@
}
}
+ def createTableInfo(
+ databaseName: String,
+ tableName: String,
+ tablePath: String,
+ dataSchema: StructType,
+ properties: Map[String, String],
+ query: Option[LogicalPlan],
+ sparkSession: SparkSession
+ ): TableInfo = {
+ val model = createTableInfoFromParams(
+ databaseName, tableName, properties, dataSchema, query, sparkSession)
+ val tableInfo = TableNewProcessor(model)
+ val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
+ tableInfo.setTablePath(tablePath)
+ tableInfo.setTransactionalTable(isTransactionalTable)
+ tableInfo.setDatabaseName(databaseName)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ tableInfo
+ }
+
def updateAndCreateTable(
- identifier: AbsoluteTableIdentifier,
+ databaseName: String,
+ tableName: String,
+ tablePath: String,
dataSchema: StructType,
sparkSession: SparkSession,
metaStore: CarbonMetaStore,
properties: Map[String, String],
- query: Option[LogicalPlan]): Map[String, String] = {
- val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
- val tableInfo: TableInfo = TableNewProcessor(model)
- val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true")
- tableInfo.setTablePath(identifier.getTablePath)
- tableInfo.setTransactionalTable(isTransactionalTable)
- tableInfo.setDatabaseName(identifier.getDatabaseName)
- val schemaEvolutionEntry = new SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
- metaStore.saveToDisk(tableInfo, identifier.getTablePath)
+ query: Option[LogicalPlan],
+ persistSchema: Boolean = true): Map[String, String] = {
+ val tableInfo = createTableInfo(
+ databaseName, tableName, tablePath, dataSchema, properties, query, sparkSession)
+ val map = if (persistSchema &&
+ !metaStore.isReadFromHiveMetaStore &&
+ tableInfo.isTransactionalTable) {
+ metaStore.saveToDisk(tableInfo, tablePath)
new java.util.HashMap[String, String]()
} else {
CarbonUtil.convertToMultiStringMap(tableInfo)
}
properties.foreach(e => map.put(e._1, e._2))
- map.put("tablepath", identifier.getTablePath)
- map.put("dbname", identifier.getDatabaseName)
+ map.put("tablepath", tablePath)
+ map.put("dbname", databaseName)
if (map.containsKey("tableName")) {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.warn("tableName is not required in options, ignoring it")
}
- map.put("tableName", identifier.getTableName)
+ map.put("tableName", tableName)
map.asScala.toMap
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
index 8a37989..233f28d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -38,5 +38,5 @@
override def genCode(ctx: CodegenContext): ExprCode = nonDt.genCode(ctx)
- override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy()
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index ded87b9..9f203c8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -164,6 +164,7 @@
}
private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
+ LOGGER.info("Trying to drop DataMap from system folder")
try {
if (dataMapSchema == null) {
dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 130580d..b90faa7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -690,13 +690,13 @@
attr.dataType
}
}
- CarbonToSparkAdapter.createAttributeReference(attr.name,
+ CarbonToSparkAdapter.createAttributeReference(
+ attr.name,
updatedDataType,
attr.nullable,
attr.metadata,
attr.exprId,
- attr.qualifier,
- attr)
+ attr.qualifier)
}
// Only select the required columns
var output = if (partition.nonEmpty) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index c12ff6c..d3f8079 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -97,12 +97,12 @@
carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable)(sparkSession)
- // In case of spark2.2 and above and , when we call
+ // when we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of add column, spark gets the catalog table and then it itself adds the partition
// columns if the table is partition table for all the new data schema sent by carbon,
// so there will be duplicate partition columns, so send the columns without partition columns
- val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) {
+ val cols = if (carbonTable.isHivePartitionTable) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
(col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 7e66d34..cf05a9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -163,7 +163,7 @@
if (isDataTypeChange) {
// if column datatype change operation is on partition column, then fail the datatype change
// operation
- if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) {
+ if (null != carbonTable.getPartitionInfo) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
partitionColumns.asScala.foreach {
col =>
@@ -286,14 +286,13 @@
// update the schema changed column at the specific index in carbonColumns based on schema order
carbonColumns
.update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
- // In case of spark2.2 and above and , when we call
+ // When we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of rename column or change datatype, spark gets the catalog table and then it itself
// adds the partition columns if the table is partition table for all the new data schema sent
// by carbon, so there will be duplicate partition columns, so send the columns without
// partition columns
- val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
- carbonTable.isHivePartitionTable) {
+ val columns = if (carbonTable.isHivePartitionTable) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains(
col))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index bdc0228..4cb8a2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -151,13 +151,12 @@
val cols = carbonTable.getCreateOrderColumn().asScala
.collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
.filterNot(column => delCols.contains(column))
- // In case of spark2.2 and above and , when we call
+ // When we call
// alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog
// in case of drop column, spark gets the catalog table and then it itself adds the partition
// columns if the table is partition table for all the new data schema sent by carbon,
// so there will be duplicate partition columns, so send the columns without partition columns
- val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
- carbonTable.isHivePartitionTable) {
+ val columns = if (carbonTable.isHivePartitionTable) {
val partitionColumns = partitionInfo.getColumnSchemaList.asScala
val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
Some(carbonColumnsWithoutPartition)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
new file mode 100644
index 0000000..f36e1ac
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, MetadataCommand}
+
+/**
+ * Command to create table in case of 'USING CARBONDATA' DDL
+ *
+ * @param catalogTable catalog table created by spark
+ * @param ignoreIfExists ignore if table exists
+ * @param sparkSession spark session
+ */
+case class CarbonCreateDataSourceTableCommand(
+ catalogTable: CatalogTable,
+ ignoreIfExists: Boolean,
+ sparkSession: SparkSession)
+ extends MetadataCommand {
+
+ override def processMetadata(session: SparkSession): Seq[Row] = {
+ // Run the spark command to create table in metastore before saving carbon schema
+ // in table path.
+ // This is required for spark 2.4, because spark 2.4 will fail to create table
+ // if table path is created before hand
+ val updatedCatalogTable =
+ CarbonSource.updateCatalogTableWithCarbonSchema(catalogTable, session, None, false)
+ val sparkCommand = CreateDataSourceTableCommand(updatedCatalogTable, ignoreIfExists)
+ sparkCommand.run(session)
+
+ // save the table info (carbondata's schema) in table path
+ val tableName = catalogTable.identifier.table
+ val dbName = CarbonEnv.getDatabaseName(catalogTable.identifier.database)(session)
+ val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(session)
+ val tableInfo = CarbonSource.createTableInfo(
+ dbName, tableName, tablePath, catalogTable.schema, catalogTable.properties, None, session)
+ val metastore = CarbonEnv.getInstance(session).carbonMetaStore
+ metastore.saveToDisk(tableInfo, tablePath)
+
+ Seq.empty
+ }
+
+ override protected def opName: String = "CREATE TABLE USING CARBONDATA"
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 4adb1aa..2a5d78b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -207,8 +207,9 @@
} catch {
case _: Exception => // No operation
}
+ throw e
val msg = s"Create table'$tableName' in database '$dbName' failed"
- throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
+ throwMetadataException(dbName, tableName, s"$msg, ${e.getMessage}")
}
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 54a5757..8700b29 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -134,6 +134,7 @@
}
val indexDatamapSchemas =
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+ LOGGER.info(s"Dropping DataMaps in table $tableName, size: " + indexDatamapSchemas.size())
if (!indexDatamapSchemas.isEmpty) {
childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
val command = CarbonDropDataMapCommand(schema.getDataMapName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 2e1f91f..8acc749 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -142,10 +142,13 @@
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
val attrs = projectExprsNeedToDecode.map { attr =>
- val newAttr = AttributeReference(attr.name,
+ val newAttr = CarbonToSparkAdapter.createAttributeReference(
+ attr.name,
attr.dataType,
attr.nullable,
- attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+ attr.metadata,
+ attr.exprId,
+ Option(table.carbonRelation.tableName))
relation.addAttribute(newAttr)
newAttr
}
@@ -194,8 +197,7 @@
attr.nullable,
attr.metadata,
attr.exprId,
- attr.qualifier,
- attr)
+ attr.qualifier)
}
}
partitions =
@@ -403,7 +405,7 @@
newProjectList :+= reference
a.transform {
case s: ScalaUDF =>
- ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+ CarbonToSparkAdapter.createScalaUDF(s, reference)
}
case other => other
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index a851bc3..49a3d3b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,7 @@
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand}
import org.apache.spark.sql.execution.command.schema._
-import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateDataSourceTableCommand, CarbonCreateTableLikeCommand, CarbonDescribeFormattedCommand, CarbonDropTableCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
@@ -171,7 +171,7 @@
ExecutedCommandExec(addColumn) :: Nil
}
// TODO: remove this else if check once the 2.1 version is unsupported by carbon
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ } else {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
@@ -185,8 +185,6 @@
.invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) ::
Nil
// TODO: remove this else check once the 2.1 version is unsupported by carbon
- } else {
- throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -275,9 +273,12 @@
if table.provider.get != DDLUtils.HIVE_PROVIDER
&& (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| table.provider.get.equalsIgnoreCase("carbondata")) =>
- val updatedCatalog = CarbonSource
- .updateCatalogTableWithCarbonSchema(table, sparkSession)
- val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
+ val cmd = if (SparkUtil.isSparkVersionEqualTo("2.4")) {
+ CarbonCreateDataSourceTableCommand(table, ignoreIfExists, sparkSession)
+ } else {
+ val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
+ CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
+ }
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
if CarbonEnv.getInstance(sparkSession).carbonMetaStore
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index fd7defa..5e82eb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -26,8 +26,8 @@
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{MixedFormatHandlerUtil, SparkSession}
import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
-import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 5323293..eda131f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -78,9 +78,7 @@
"Update operation is not supported for mv datamap table")
}
}
- val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- relation
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val tableRelation =
alias match {
case Some(_) =>
CarbonReflectionUtils.getSubqueryAlias(
@@ -90,9 +88,6 @@
Some(table.tableIdentifier))
case _ => relation
}
- } else {
- throw new UnsupportedOperationException("Unsupported Spark version.")
- }
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
@@ -221,21 +216,15 @@
}
}
// include tuple id in subquery
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- Project(projList, relation)
- } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
- alias match {
- case Some(_) =>
- val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
- sparkSession,
- alias,
- relation,
- Some(table.tableIdentifier))
- Project(projList, subqueryAlias)
- case _ => Project(projList, relation)
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Spark version.")
+ alias match {
+ case Some(_) =>
+ val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias(
+ sparkSession,
+ alias,
+ relation,
+ Some(table.tableIdentifier))
+ Project(projList, subqueryAlias)
+ case _ => Project(projList, relation)
}
}
CarbonProjectForDeleteCommand(
@@ -314,13 +303,7 @@
}
}
val newChild: LogicalPlan = if (newChildOutput == child.output) {
- if (SparkUtil.isSparkVersionEqualTo("2.1")) {
- CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
- } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
- CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
- } else {
- throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
- }
+ throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
} else {
Project(newChildOutput, child)
}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
similarity index 99%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
index 8f4d45e..04beea7 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -47,4 +47,4 @@
logicalPlan
}
}
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b2ba7f4..849d7ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -200,8 +200,7 @@
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDatasourceHadoopRelation.carbonRelation
case SubqueryAlias(_, c)
- if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
- (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
"org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
@@ -524,8 +523,7 @@
carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
carbonDataSourceHadoopRelation
case SubqueryAlias(_, c)
- if (SparkUtil.isSparkVersionXandAbove("2.2")) &&
- (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName
.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
similarity index 68%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
index 72d3ae2..e19966f 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizerUtil.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.sql.hive
import org.apache.spark.sql.CarbonDatasourceHadoopRelation
@@ -29,7 +46,7 @@
}
Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
- case In(value, Seq(l:ListQuery)) =>
+ case In(value, Seq(l: ListQuery)) =>
val tPlan = l.plan.transform {
case lr: LogicalRelation
if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
@@ -41,4 +58,4 @@
}
transFormedPlan
}
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 9b3ff87..68c293b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -18,11 +18,11 @@
import java.util.LinkedHashSet
-import scala.Array.canBuildFrom
import scala.collection.JavaConverters._
+import org.apache.spark.sql.CarbonToSparkAdapter
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
@@ -119,7 +119,8 @@
val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
- AttributeReference(column.getColName, output, nullable = true )(
+ CarbonToSparkAdapter.createAttributeReference(
+ column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
qualifier = Option(tableName + "." + column.getColName))
} else {
val output = CarbonMetastoreTypes.toDataType {
@@ -129,7 +130,8 @@
case others => others
}
}
- AttributeReference(column.getColName, output, nullable = true)(
+ CarbonToSparkAdapter.createAttributeReference(
+ column.getColName, output, nullable = true, Metadata.empty, NamedExpression.newExprId,
qualifier = Option(tableName + "." + column.getColName))
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 20d43df..dcba730 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -26,11 +26,9 @@
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
/**
- * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
+ * This interface defines those common api used by carbon for spark integration,
* but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the
* Concrete implementation classes.
- * For example CarbonSessionCatalog defined in 2.1 and 2.2.
- *
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
similarity index 92%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index f78c785..44b3bfd 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,12 +27,11 @@
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
@@ -61,8 +60,8 @@
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends HiveSessionCatalog (
- externalCatalog,
- globalTempViewManager,
+ SparkAdapter.getExternalCatalogCatalog(externalCatalog),
+ SparkAdapter.getGlobalTempViewManager(globalTempViewManager),
new HiveMetastoreCatalog(sparkSession),
functionRegistry,
conf,
@@ -111,8 +110,9 @@
* @return
*/
override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
+ SparkAdapter.getHiveExternalCatalog(
+ sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+ ).client
}
override def alterAddColumns(tableIdentifier: TableIdentifier,
@@ -174,9 +174,10 @@
* @param identifier
* @return
*/
- override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+ override def getPartitionsAlternate(
+ partitionFilters: Seq[Expression],
sparkSession: SparkSession,
- identifier: TableIdentifier) = {
+ identifier: TableIdentifier): Seq[CatalogTablePartition] = {
CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
}
@@ -233,14 +234,14 @@
}
private def externalCatalog: HiveExternalCatalog =
- session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+ SparkAdapter.getHiveExternalCatalog(session.sharedState.externalCatalog)
/**
* Create a Hive aware resource loader.
*/
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
val client: HiveClient = externalCatalog.client.newSession()
- new HiveSessionResourceLoader(session, client)
+ new HiveSessionResourceLoader(session, SparkAdapter.getHiveClient(client))
}
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
@@ -274,4 +275,4 @@
}
override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
\ No newline at end of file
+}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
similarity index 85%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..e9bcb43 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -1,39 +1,36 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.spark.sql.hive
-import java.util.concurrent.Callable
+import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.util.SparkTypeConverter
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.util.SparkTypeConverter
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
* This class refresh the relation from cache if the carbontable in
@@ -177,8 +174,7 @@
def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
- case sa@SubqueryAlias(_,
- MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
+ case sa@SubqueryAlias(_, MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
sa.copy(child = sa.child.asInstanceOf[LogicalRelation].copy())
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
plan.asInstanceOf[LogicalRelation].copy()
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
similarity index 93%
rename from integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index 73b6790..36ec2c8 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -38,8 +38,8 @@
fileStorage.equalsIgnoreCase("'carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
- ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
- Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+ ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),
+ ctx.locationSpec(0), Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
helper.createCarbonTable(createTableTuple)
} else {
super.visitCreateHiveTable(ctx)
@@ -47,6 +47,6 @@
}
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
- visitAddTableColumns(parser,ctx)
+ visitAddTableColumns(parser, ctx)
}
}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSqlConf.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConf.scala
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
similarity index 98%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index ee9fb0f..0335b36 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,12 +19,12 @@
import java.net.URI
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand}
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.util.CarbonReflectionUtils
/**
@@ -49,7 +48,7 @@
Seq.empty
}
- override def processData(sparkSession: SparkSession): Seq[Row] ={
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
similarity index 100%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
rename to integration/spark2/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 2765c5f..e288e6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -315,15 +315,13 @@
* @return
*/
def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
- var isCompatible = true
- if (SparkUtil.isSparkVersionXandAbove("2.3")) {
- val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
- .asInstanceOf[Option[Expression]]
- if (trimStr.isDefined) {
- isCompatible = false
- }
+ val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
+ .asInstanceOf[Option[Expression]]
+ if (trimStr.isDefined) {
+ false
+ } else {
+ true
}
- isCompatible
}
def transformExpression(expr: Expression): CarbonExpression = {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 10b661a..4866301 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -305,7 +305,7 @@
}
}
val (selectStmt, relation) =
- if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
+ if (selectPattern.findFirstIn(sel.toLowerCase).isEmpty ||
!StringUtils.isEmpty(subQueryResults)) {
// if subQueryResults are not empty means, it is not join with main table.
// so use subQueryResults in update with value flow.
@@ -348,8 +348,6 @@
rel
}
-
-
private def updateRelation(
r: UnresolvedRelation,
tableIdent: Seq[String],
@@ -362,8 +360,6 @@
case Seq(dbName, tableName) => Some(tableName)
case Seq(tableName) => Some(tableName)
}
- // Use Reflection to choose between Spark2.1 and Spark2.2
- // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
}
}
@@ -386,8 +382,6 @@
val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
- // Use Reflection to choose between Spark2.1 and Spark2.2
- // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
(unresolvedRelation, tableIdent, alias, tableIdentifier)
@@ -678,6 +672,7 @@
val name = field.column.toLowerCase
field.copy(column = name, name = Some(name))
}
+
protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
deleted file mode 100644
index 79a6240..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
- def addSparkListener(sparkContext: SparkContext) = {
- sparkContext.addSparkListener(new SparkListener {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- SparkSession.setDefaultSession(null)
- SparkSession.sqlListener.set(null)
- }
- })
- }
-
- def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
- metadata: Metadata,exprId: ExprId, qualifier: Option[String],
- attrRef : NamedExpression): AttributeReference = {
- AttributeReference(
- name,
- dataType,
- nullable,
- metadata)(exprId, qualifier,attrRef.isGenerated)
- }
-
- def createAliasRef(child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Option[String] = None,
- explicitMetadata: Option[Metadata] = None,
- namedExpr: Option[NamedExpression] = None): Alias = {
- val isGenerated:Boolean = if (namedExpr.isDefined) {
- namedExpr.get.isGenerated
- } else {
- false
- }
- Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
- }
-
- def getExplainCommandObj() : ExplainCommand = {
- ExplainCommand(OneRowRelation)
- }
-
- def getPartitionKeyFilter(
- partitionSet: AttributeSet,
- filterPredicates: Seq[Expression]): ExpressionSet = {
- ExpressionSet(
- ExpressionSet(filterPredicates)
- .filter(_.references.subsetOf(partitionSet)))
- }
-
- def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
- map: Map[String, String],
- tablePath: String): CatalogStorageFormat = {
- storageFormat.copy(properties = map, locationUri = Some(tablePath))
- }
-
- def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
- Seq(OptimizeCodegen(conf))
- }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
deleted file mode 100644
index 69ed477..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation}
-import org.apache.spark.sql.types.StructType
-
-object MixedFormatHandlerUtil {
-
- def getScanForSegments(
- @transient relation: HadoopFsRelation,
- output: Seq[Attribute],
- outputSchema: StructType,
- partitionFilters: Seq[Expression],
- dataFilters: Seq[Expression],
- tableIdentifier: Option[TableIdentifier]
- ): FileSourceScanExec = {
- val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
- FileSourceScanExec(
- relation,
- output,
- outputSchema,
- partitionFilters,
- pushedDownFilters,
- tableIdentifier)
- }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
deleted file mode 100644
index eb3e88d..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.catalog
-
-import com.google.common.base.Objects
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A `LogicalPlan` that represents a hive table.
- *
- * TODO: remove this after we completely make hive as a data source.
- */
-case class HiveTableRelation(
- tableMeta: CatalogTable,
- dataCols: Seq[AttributeReference],
- partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
- assert(tableMeta.identifier.database.isDefined)
- assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
- assert(tableMeta.schema.sameType(dataCols.toStructType))
-
- // The partition column should always appear after data columns.
- override def output: Seq[AttributeReference] = dataCols ++ partitionCols
-
- def isPartitioned: Boolean = partitionCols.nonEmpty
-
- override def equals(relation: Any): Boolean = relation match {
- case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
- case _ => false
- }
-
- override def hashCode(): Int = {
- Objects.hashCode(tableMeta.identifier, output)
- }
-
- override def newInstance(): HiveTableRelation = copy(
- dataCols = dataCols.map(_.newInstance()),
- partitionCols = partitionCols.map(_.newInstance()))
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
deleted file mode 100644
index 9a88255..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
-import org.apache.spark.sql.catalyst.expressions.aggregate.First
-import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-class MigrateOptimizer {
-
-}
-
-/**
- * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
- */
-object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case Deduplicate(keys, child, streaming) if !streaming =>
- val keyExprIds = keys.map(_.exprId)
- val aggCols = child.output.map { attr =>
- if (keyExprIds.contains(attr.exprId)) {
- attr
- } else {
- Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
- }
- }
- Aggregate(keys, aggCols, child)
- }
-}
-
-/** A logical plan for `dropDuplicates`. */
-case class Deduplicate(
- keys: Seq[Attribute],
- child: LogicalPlan,
- streaming: Boolean) extends UnaryNode {
-
- override def output: Seq[Attribute] = child.output
-}
-
-/**
- * Remove projections from the query plan that do not make any modifications.
- */
-object RemoveRedundantProject extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case p @ Project(_, child) if p.output == child.output => child
- }
-}
-
-/**
- * push down operations into [[CreateNamedStructLike]].
- */
-object SimplifyCreateStructOps extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformExpressionsUp {
- // push down field extraction
- case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
- createNamedStructLike.valExprs(ordinal)
- }
- }
-}
-
-/**
- * push down operations into [[CreateArray]].
- */
-object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformExpressionsUp {
- // push down field selection (array of structs)
- case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
- // instead f selecting the field on the entire array,
- // select it from each member of the array.
- // pushing down the operation this way open other optimizations opportunities
- // (i.e. struct(...,x,...).x)
- CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
- // push down item selection.
- case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
- // instead of creating the array and then selecting one row,
- // remove array creation altgether.
- if (idx >= 0 && idx < elems.size) {
- // valid index
- elems(idx)
- } else {
- // out of bounds, mimic the runtime behavior and return null
- Literal(null, ga.dataType)
- }
- }
- }
-}
-
-/**
- * push down operations into [[CreateMap]].
- */
-object SimplifyCreateMapOps extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformExpressionsUp {
- case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
- }
- }
-}
-
-
-/**
- * Removes MapObjects when the following conditions are satisfied
- * 1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
- * are primitive types with non-nullable
- * 2. no custom collection class specified representation of data item.
- */
-object EliminateMapObjects extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
- case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
- }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
deleted file mode 100644
index 8eb05fc..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
- val carbonProperties = CarbonProperties.getInstance()
-
- /**
- * To initialize dynamic param defaults along with usage docs
- */
- def addDefaultCarbonParams(): Unit = {
- val ENABLE_UNSAFE_SORT =
- SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
- .doc("To enable/ disable unsafe sort.")
- .booleanConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
- SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
- .doc("To set carbon task distribution.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- val BAD_RECORDS_LOGGER_ENABLE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
- .doc("To enable/ disable carbon bad record logger.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants
- .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- val BAD_RECORDS_ACTION =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- .doc("To configure the bad records action.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- val IS_EMPTY_DATA_BAD_RECORD =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
- .doc("Property to decide weather empty data to be considered bad/ good record.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
- .toBoolean)
- val SORT_SCOPE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
- .doc("Property to specify sort scope.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val BAD_RECORD_PATH =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
- .doc("Property to configure the bad record location.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- val GLOBAL_SORT_PARTITIONS =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
- .doc("Property to configure the global sort partitions.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- val DATEFORMAT =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- .doc("Property to configure data format for date type columns.")
- .stringConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
- "carbon.input.segments.<database_name>.<table_name>")
- .doc("Property to configure the list of segments to query.").stringConf
- .createWithDefault(carbonProperties
- .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
- }
- /**
- * to set the dynamic properties default values
- */
- def addDefaultCarbonSessionParams(): Unit = {
- sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index 6b94806..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.spark.util.CarbonReflectionUtils
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
- externalCatalog: HiveExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- sparkSession: SparkSession,
- functionResourceLoader: FunctionResourceLoader,
- functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hadoopConf: Configuration)
- extends HiveSessionCatalog(
- externalCatalog,
- globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- hadoopConf) with CarbonSessionCatalog {
-
- private lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
- /**
- * return's the carbonEnv instance
- * @return
- */
- override def getCarbonEnv() : CarbonEnv = {
- carbonEnv
- }
-
- def alterAddColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- def alterDropColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- // Initialize all listeners to the Operation bus.
- CarbonEnv.init
-
- /**
- * This method will invalidate carbonrelation from cache if carbon table is updated in
- * carbon catalog
- *
- * @param name
- * @param alias
- * @return
- */
- override def lookupRelation(name: TableIdentifier,
- alias: Option[String]): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name, alias)
- var toRefreshRelation = false
- rtnRelation match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
- toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
- case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
- case _ =>
- }
-
- if (toRefreshRelation) {
- super.lookupRelation(name, alias)
- } else {
- rtnRelation
- }
- }
-
- /**
- * returns hive client from session state
- *
- * @return
- */
- override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- }
-
- override def createPartitions(
- tableName: TableIdentifier,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- try {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } catch {
- case e: Exception =>
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- def getPartitionsAlternate(
- partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier) = {
- val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
- val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
- val partitionSchema = catalogTable.partitionSchema
- if (partitionFilters.nonEmpty) {
- val boundPredicate =
- InterpretedPredicate.create(partitionFilters.reduce(And).transform {
- case att: AttributeReference =>
- val index = partitionSchema.indexWhere(_.name == att.name)
- BoundReference(index, partitionSchema(index).dataType, nullable = true)
- })
- allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
- } else {
- allPartitions
- }
- }
-
- /**
- * Update the storageformat with new location information
- */
- override def updateStorageLocation(
- path: Path,
- storage: CatalogStorageFormat,
- newTableName: String,
- dbName: String): CatalogStorageFormat = {
- storage.copy(locationUri = Some(path.toString))
- }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies = extraStrategies
-
- experimentalMethods.extraOptimizations = extraOptimizations
-
- def extraStrategies: Seq[Strategy] = {
- Seq(
- new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- }
-
- def extraOptimizations: Seq[Rule[LogicalPlan]] = {
- Seq(new CarbonIUDRule,
- new CarbonUDFTransformRule,
- new CarbonLateDecodeRule)
- }
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
- def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- CarbonPreInsertionCasts(sparkSession) ::
- CarbonIUDAnalysisRule(sparkSession) ::
- AnalyzeCreateTable(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
- (if (conf.runSQLonFile) {
- new ResolveDataSource(sparkSession) :: Nil
- } else { Nil })
- }
-
- override lazy val analyzer: Analyzer =
- new CarbonAnalyzer(catalog, conf, sparkSession,
- new Analyzer(catalog, conf) {
- override val extendedResolutionRules =
- if (extendedAnalyzerRules.nonEmpty) {
- extendedAnalyzerRules ++ internalAnalyzerRules
- } else {
- internalAnalyzerRules
- }
- override val extendedCheckRules = Seq(
- PreWriteCheck(conf, catalog))
- }
- )
-
- /**
- * Internal catalog for managing table and database states.
- */
- override lazy val catalog = {
- new CarbonHiveSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- newHadoopConf())
- }
-}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
- conf: CatalystConf,
- sparkSession: SparkSession,
- analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
- val mvPlan = try {
- CarbonReflectionUtils.createObject(
- "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
- sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
- } catch {
- case e: Exception =>
- null
- }
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- val logicalPlan = analyzer.execute(plan)
- if (mvPlan != null) {
- mvPlan.apply(logicalPlan)
- } else {
- logicalPlan
- }
- }
-}
-
-class CarbonOptimizer(
- catalog: SessionCatalog,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
- super.execute(transFormedPlan)
- }
-}
-
-object CarbonOptimizerUtil {
- def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
- // And optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case p: PredicateSubquery =>
- val tPlan = p.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
- }
- }
- transFormedPlan
- }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) {
-
- val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("carbondata") ||
- fileStorage.equalsIgnoreCase("'carbonfile'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
- ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
- Option(ctx.STRING()).map(string),
- ctx.AS, ctx.query, fileStorage)
- helper.createCarbonTable(createTableTuple)
- } else {
- super.visitCreateTable(ctx)
- }
- }
-
- override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
- withOrigin(ctx) {
- if (CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
- CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
- super.visitShowTables(ctx)
- } else {
- CarbonShowTablesCommand(
- Option(ctx.db).map(_.getText),
- Option(ctx.pattern).map(string))
- }
- }
- }
-
- override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
- CarbonExplainCommand(super.visitExplain(ctx))
- }
-}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index dd690e4..0000000
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{
- CatalogRelation, CatalogTable, CatalogTableType,
- SimpleCatalogRelation
-}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{
- AlterTableRecoverPartitionsCommand, DDLUtils,
- RunnableCommand
-}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.StructType
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- * @param table the Catalog Table
- * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
- table: CatalogTable,
- mode: SaveMode,
- query: LogicalPlan)
- extends RunnableCommand {
-
- override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- assert(table.tableType != CatalogTableType.VIEW)
- assert(table.provider.isDefined)
- assert(table.schema.isEmpty)
-
- val provider = table.provider.get
- val sessionState = sparkSession.sessionState
- val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = table.identifier.copy(database = Some(db))
- val tableName = tableIdentWithDB.unquotedString
-
- var createMetastoreTable = false
- var existingSchema = Option.empty[StructType]
- if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
- // Check if we need to throw an exception or just return.
- mode match {
- case SaveMode.ErrorIfExists =>
- throw new AnalysisException(s"Table $tableName already exists. " +
- s"If you are using saveAsTable, you can set SaveMode to " +
- s"SaveMode.Append to " +
- s"insert data into the table or set SaveMode to SaveMode" +
- s".Overwrite to overwrite" +
- s"the existing data. " +
- s"Or, if you are using SQL CREATE TABLE, you need to drop " +
- s"$tableName first.")
- case SaveMode.Ignore =>
- // Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty[Row]
- case SaveMode.Append =>
- // Check if the specified data source match the data source of the existing table.
- val existingProvider = DataSource.lookupDataSource(provider)
- // TODO: Check that options from the resolved relation match the relation that we are
- // inserting into (i.e. using the same compression).
-
- // Pass a table identifier with database part, so that `lookupRelation` won't get temp
- // views unexpectedly.
- EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
- case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
- // check if the file formats match
- l.relation match {
- case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
- throw new AnalysisException(
- s"The file format of the existing table $tableName is " +
- s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
- s"format `$provider`")
- case _ =>
- }
- if (query.schema.size != l.schema.size) {
- throw new AnalysisException(
- s"The column number of the existing schema[${ l.schema }] " +
- s"doesn't match the data schema[${ query.schema }]'s")
- }
- existingSchema = Some(l.schema)
- case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- existingSchema = Some(s.metadata.schema)
- case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
- throw new AnalysisException("Saving data in the Hive serde table " +
- s"${ c.catalogTable.identifier } is not supported yet. " +
- s"Please use the insertInto() API as an alternative..")
- case o =>
- throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
- }
- case SaveMode.Overwrite =>
- sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
- // Need to create the table again.
- createMetastoreTable = true
- }
- } else {
- // The table does not exist. We need to create it in metastore.
- createMetastoreTable = true
- }
-
- val data = Dataset.ofRows(sparkSession, query)
- val df = existingSchema match {
- // If we are inserting into an existing table, just use the existing schema.
- case Some(s) => data.selectExpr(s.fieldNames: _*)
- case None => data
- }
-
- val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
- Some(sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.locationUri
- }
-
- // Create the relation based on the data of df.
- val pathOption = tableLocation.map("path" -> _)
- val dataSource = DataSource(
- sparkSession,
- className = provider,
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption,
- catalogTable = Some(table))
-
- val result = try {
- dataSource.write(mode, df)
- } catch {
- case ex: AnalysisException =>
- logError(s"Failed to write to table $tableName in $mode mode", ex)
- throw ex
- }
- result match {
- case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
- sparkSession.sqlContext.conf.manageFilesourcePartitions =>
- // Need to recover partitions into the metastore so our saved data is visible.
- sparkSession.sessionState.executePlan(
- AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
- case _ =>
- }
-
- // Refresh the cache of the table in the catalog.
- sessionState.catalog.refreshTable(tableIdentWithDB)
- Seq.empty[Row]
- }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
deleted file mode 100644
index 446b5a5..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql
-
-import java.net.URI
-
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, Metadata}
-
-object CarbonToSparkAdapter {
-
- def addSparkListener(sparkContext: SparkContext) = {
- sparkContext.addSparkListener(new SparkListener {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- SparkSession.setDefaultSession(null)
- SparkSession.sqlListener.set(null)
- }
- })
- }
-
- def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
- metadata: Metadata,exprId: ExprId, qualifier: Option[String],
- attrRef : NamedExpression): AttributeReference = {
- AttributeReference(
- name,
- dataType,
- nullable,
- metadata)(exprId, qualifier,attrRef.isGenerated)
- }
-
- def createAliasRef(child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Option[String] = None,
- explicitMetadata: Option[Metadata] = None,
- namedExpr: Option[NamedExpression] = None): Alias = {
- val isGenerated:Boolean = if (namedExpr.isDefined) {
- namedExpr.get.isGenerated
- } else {
- false
- }
- Alias(child, name)(exprId, qualifier, explicitMetadata,isGenerated)
- }
-
- def getExplainCommandObj() : ExplainCommand = {
- ExplainCommand(OneRowRelation)
- }
-
- def getPartitionKeyFilter(
- partitionSet: AttributeSet,
- filterPredicates: Seq[Expression]): ExpressionSet = {
- ExpressionSet(
- ExpressionSet(filterPredicates)
- .filter(_.references.subsetOf(partitionSet)))
- }
-
- def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
- Seq(OptimizeCodegen(conf))
- }
-
- def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
- map: Map[String, String],
- tablePath: String): CatalogStorageFormat = {
- storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
- }
-}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
deleted file mode 100644
index 7177f1a..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
-
- val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
- override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
- val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat)
-
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("carbondata") ||
- fileStorage.equalsIgnoreCase("'carbonfile'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
- ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
- Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
- helper.createCarbonTable(createTableTuple)
- } else {
- super.visitCreateHiveTable(ctx)
- }
- }
-
- override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
- visitAddTableColumns(parser,ctx)
- }
-}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
new file mode 100644
index 0000000..d134de1
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+ extends LeafExpression with NamedExpression with CodegenFallback {
+
+ type EvaluatedType = Any
+
+ override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+ override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+ override def name: String = colExp.getColumnName
+
+ override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+ override def exprId: ExprId = throw new UnsupportedOperationException
+
+ override def qualifier: Option[String] = null
+
+ override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 9094dfe..7003c26 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -23,11 +23,11 @@
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, Metadata}
object CarbonToSparkAdapter {
@@ -41,8 +41,8 @@
}
def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
- metadata: Metadata, exprId: ExprId, qualifier: Option[String],
- attrRef : NamedExpression): AttributeReference = {
+ metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression = null): AttributeReference = {
AttributeReference(
name,
dataType,
@@ -50,14 +50,23 @@
metadata)(exprId, qualifier)
}
- def createAliasRef(child: Expression,
- name: String,
- exprId: ExprId = NamedExpression.newExprId,
- qualifier: Option[String] = None,
- explicitMetadata: Option[Metadata] = None,
- namedExpr : Option[NamedExpression] = None ) : Alias = {
+ def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
+ ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+ }
- Alias(child, name)(exprId, qualifier, explicitMetadata)
+ def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
+ ): ExprCode = {
+ ExprCode(code, isNull, value)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr : Option[NamedExpression] = None ) : Alias = {
+
+ Alias(child, name)(exprId, qualifier, explicitMetadata)
}
def getExplainCommandObj() : ExplainCommand = {
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
new file mode 100644
index 0000000..bffb900
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -0,0 +1,27 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+ def getScanForSegments(
+ @transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ outputSchema: StructType,
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier]
+ ): FileSourceScanExec = {
+ FileSourceScanExec(
+ relation,
+ output,
+ outputSchema,
+ partitionFilters,
+ dataFilters,
+ tableIdentifier)
+ }
+}
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 5435f04..25d5543 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.execution.strategy
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
similarity index 97%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
rename to integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index 36f166d..4ad3b11 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -21,21 +21,20 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
new file mode 100644
index 0000000..61959c2
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/SparkAdapter.scala
@@ -0,0 +1,14 @@
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager}
+import org.apache.spark.sql.hive.client.HiveClient
+
+object SparkAdapter {
+ def getExternalCatalogCatalog(catalog: HiveExternalCatalog) = catalog
+
+ def getGlobalTempViewManager(manager: GlobalTempViewManager) = manager
+
+ def getHiveClient(client: HiveClient) = client
+
+ def getHiveExternalCatalog(catalog: ExternalCatalog) = catalog.asInstanceOf[HiveExternalCatalog]
+}
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
new file mode 100644
index 0000000..ee4c9ce
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
@@ -0,0 +1,44 @@
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
+ extends LeafExpression with NamedExpression with CodegenFallback {
+
+ type EvaluatedType = Any
+
+ override def toString: String = s"input[" + colExp.getColIndex + "]"
+
+ override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
+
+ override def name: String = colExp.getColumnName
+
+ override def toAttribute: Attribute = throw new UnsupportedOperationException
+
+ override def exprId: ExprId = throw new UnsupportedOperationException
+
+ override def qualifier: Seq[String] = null
+
+ override def newInstance(): NamedExpression = throw new UnsupportedOperationException
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
new file mode 100644
index 0000000..082d1ec
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapter {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ }
+ })
+ }
+
+ def createAttributeReference(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ metadata: Metadata,
+ exprId: ExprId,
+ qualifier: Option[String],
+ attrRef : NamedExpression = null): AttributeReference = {
+ val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qf)
+ }
+
+ def createAttributeReference(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ metadata: Metadata,
+ exprId: ExprId,
+ qualifier: Seq[String]): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier)
+ }
+
+ def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
+ ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
+ }
+
+ def createExprCode(code: String, isNull: String, value: String, dataType: DataType) = {
+ ExprCode(
+ code"$code",
+ JavaCode.isNullVariable(isNull),
+ JavaCode.variable(value, dataType))
+ }
+
+ def createAliasRef(
+ child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Seq[String] = Seq.empty,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr: Option[NamedExpression] = None) : Alias = {
+ Alias(child, name)(exprId, qualifier, explicitMetadata)
+ }
+
+ def createAliasRef(
+ child: Expression,
+ name: String,
+ exprId: ExprId,
+ qualifier: Option[String]) : Alias = {
+ Alias(child, name)(exprId,
+ if (qualifier.isEmpty) Seq.empty else Seq(qualifier.get),
+ None)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation())
+ }
+
+ /**
+ * As a part of SPARK-24085 Hive tables supports scala subquery for
+ * parition tables, so Carbon also needs to supports
+ * @param partitionSet
+ * @param filterPredicates
+ * @return
+ */
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filterNot(SubqueryExpression.hasSubquery)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+
+ // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq.empty
+ }
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+ }
+}
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
similarity index 98%
rename from integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
index d180cd3..7b20c06 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -37,6 +38,7 @@
output,
outputSchema,
partitionFilters,
+ None,
dataFilters,
tableIdentifier)
}
diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
similarity index 81%
rename from integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 7605574..60ee7ea 100644
--- a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.execution.strategy
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -32,7 +32,7 @@
val rdd: RDD[InternalRow],
@transient override val relation: HadoopFsRelation,
val partitioning: Partitioning,
- override val metadata: Map[String, String],
+ val md: Map[String, String],
identifier: Option[TableIdentifier],
@transient private val logicalRelation: LogicalRelation)
extends FileSourceScanExec(
@@ -40,14 +40,20 @@
output,
relation.dataSchema,
Seq.empty,
+ None,
Seq.empty,
identifier) {
- override val supportsBatch: Boolean = true
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val supportsBatch: Boolean = true
- override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
(partitioning, Nil)
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val metadata: Map[String, String] = md
+
override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
}
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
similarity index 95%
rename from integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
rename to integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
index 2f9ad3e..808099d 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -23,15 +23,14 @@
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.internal.SQLConf
-
class CarbonOptimizer(
catalog: SessionCatalog,
conf: SQLConf,
experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
+ extends SparkOptimizer(catalog, experimentalMethods) {
override def execute(plan: LogicalPlan): LogicalPlan = {
val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
super.execute(transFormedPlan)
}
-}
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
new file mode 100644
index 0000000..8132188
--- /dev/null
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/SparkAdapter.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener, GlobalTempViewManager}
+import org.apache.spark.sql.hive.client.HiveClient
+
+object SparkAdapter {
+ def getExternalCatalogCatalog(catalog: HiveExternalCatalog) =
+ () => catalog
+
+ def getGlobalTempViewManager(manager: GlobalTempViewManager) =
+ () => manager
+
+ def getHiveClient(client: HiveClient) =
+ () => client
+
+ def getHiveExternalCatalog(catalog: ExternalCatalogWithListener) =
+ catalog.unwrapped.asInstanceOf[HiveExternalCatalog]
+}
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index c1a5862..e7ca31b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -24,6 +24,7 @@
val dataMapName = "bloom_dm"
override protected def beforeAll(): Unit = {
+ sqlContext.sparkContext.setLogLevel("info")
deleteFile(bigFile)
new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
createFile(bigFile, line = 2000)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
deleted file mode 100644
index e69de29..0000000
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ /dev/null
diff --git a/pom.xml b/pom.xml
index 66214df..f74a11b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -487,7 +487,7 @@
<profiles>
<profile>
- <!--This profile does not build spark module, so user should explicitly give spark profile also like spark-2.1 -->
+ <!--This profile does not build spark module, so user should explicitly give spark profile -->
<id>build-with-format</id>
<modules>
<module>format</module>
@@ -501,103 +501,6 @@
</properties>
</profile>
<profile>
- <id>spark-2.1</id>
- <properties>
- <spark.version>2.1.0</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.eluder.coveralls</groupId>
- <artifactId>coveralls-maven-plugin</artifactId>
- <version>4.3.0</version>
- <configuration>
- <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
- <sourceEncoding>UTF-8</sourceEncoding>
- <jacocoReports>
- <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
- </jacocoReport>
- </jacocoReports>
- <sourceDirectories>
- <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
- </sourceDirectories>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>spark-2.2</id>
- <properties>
- <spark.version>2.2.1</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.8</scala.version>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.eluder.coveralls</groupId>
- <artifactId>coveralls-maven-plugin</artifactId>
- <version>4.3.0</version>
- <configuration>
- <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
- <sourceEncoding>UTF-8</sourceEncoding>
- <jacocoReports>
- <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
- </jacocoReport>
- </jacocoReports>
- <sourceDirectories>
- <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
- </sourceDirectories>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
@@ -626,11 +529,64 @@
<sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/below2.4</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.3</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/below2.4</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
+ </sourceDirectories>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.4</id>
+ <properties>
+ <spark.version>2.4.4</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.eluder.coveralls</groupId>
+ <artifactId>coveralls-maven-plugin</artifactId>
+ <version>4.3.0</version>
+ <configuration>
+ <repoToken>opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y</repoToken>
+ <sourceEncoding>UTF-8</sourceEncoding>
+ <jacocoReports>
+ <jacocoReport>${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml
+ </jacocoReport>
+ </jacocoReports>
+ <sourceDirectories>
+ <sourceDirectory>${basedir}/common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/core/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/processing/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2AndAbove</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.3And2.4</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.4</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark-common/src/main/spark2.4</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/hive/src/main/scala</sourceDirectory>