blob: 8e8f6425818ed4a479681caece5aac9b380bc93e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.stream
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.DataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.spark.StreamingOption
import org.apache.carbondata.spark.util.{CarbonSparkUtil, Util}
import org.apache.carbondata.stream.StreamJobManager
/**
* This command will start a Spark streaming job to insert rows from source to sink
*/
case class CarbonCreateStreamCommand(
streamName: String,
sinkDbName: Option[String],
sinkTableName: String,
ifNotExists: Boolean,
optionMap: Map[String, String],
query: String
) extends DataCommand {
override def output: Seq[Attribute] =
Seq(AttributeReference("Stream Name", StringType, nullable = false)(),
AttributeReference("JobId", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)())
override def processData(sparkSession: SparkSession): Seq[Row] = {
setAuditTable(CarbonEnv.getDatabaseName(sinkDbName)(sparkSession), sinkTableName)
setAuditInfo(Map("streamName" -> streamName, "query" -> query) ++ optionMap)
val inputQuery = sparkSession.sql(query)
val sourceTableSeq = inputQuery.logicalPlan collect {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
}
if (sourceTableSeq.isEmpty) {
throw new MalformedCarbonCommandException(
"Must specify stream source table in the stream query")
}
if (sourceTableSeq.size > 1) {
throw new MalformedCarbonCommandException(
"Stream query on more than one stream source table is not supported")
}
val sourceTable = sourceTableSeq.head
val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
val format = sourceTable.getFormat
if (format == null) {
throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
}
val updatedQuery = if (format.equals("kafka")) {
shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
shouldHaveProperty(tblProperty, "subscribe", sourceTable)
createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty.asScala)
} else if (format.equals("socket")) {
shouldHaveProperty(tblProperty, "host", sourceTable)
shouldHaveProperty(tblProperty, "port", sourceTable)
createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty.asScala)
} else {
// Replace the logical relation with a streaming relation created
// from the stream source table
inputQuery.logicalPlan transform {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
=> prepareStreamingRelation(sparkSession, r)
case plan: LogicalPlan => plan
}
}
if (sourceTable == null) {
throw new MalformedCarbonCommandException("Must specify stream source table in the query")
}
// add CSV row parser if user does not specify
val newMap = mutable.Map[String, String]()
optionMap.foreach(x => newMap(x._1) = x._2)
newMap(CSVInputFormat.DELIMITER) = tblProperty.asScala.getOrElse("delimiter", ",")
// start the streaming job
val jobId = StreamJobManager.startStream(
sparkSession = sparkSession,
ifNotExists = ifNotExists,
streamName = streamName,
sourceTable = sourceTable,
sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
query = query,
streamDf = Dataset.ofRows(sparkSession, updatedQuery),
options = new StreamingOption(newMap.toMap)
)
Seq(Row(streamName, jobId, "RUNNING"))
}
/**
* Create a new plan for the stream query on kafka and Socket source table.
* This is required because we need to convert the schema of the data stored in kafka
* The returned logical plan contains the complete plan tree of original plan with
* logical relation replaced with a streaming relation.
*
* @param sparkSession spark session
* @param inputQuery stream query from user
* @param sourceTable source table (kafka table)
* @param sourceName source name, kafka or socket
* @param tblProperty table property of source table
* @return a new logical plan
*/
private def createPlan(
sparkSession: SparkSession,
inputQuery: DataFrame,
sourceTable: CarbonTable,
sourceName: String,
tblProperty: mutable.Map[String, String]): LogicalPlan = {
// We follow 3 steps to generate new plan
// 1. replace the logical relation in stream query with streaming relation
// 2. collect the new ExprId generated
// 3. update the stream query plan with the new ExprId generated, to make the plan consistent
// exprList is used for UDF to extract the data from the 'value' column in kafka
val columnNames = Util.convertToSparkSchema(sourceTable).fieldNames
val exprList = columnNames.zipWithIndex.map {
case (columnName, i) =>
s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
}
val aliasMap = new util.HashMap[String, ExprId]()
val updatedQuery = inputQuery.logicalPlan transform {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
// for kafka stream source, get the 'value' column and split it by using UDF
val plan = sparkSession.readStream
.format(sourceName)
.options(tblProperty)
.load()
.selectExpr("CAST(value as string) as _value")
val recordFormat = tblProperty.getOrElse("record_format", "csv")
val newPlan = recordFormat match {
case "csv" =>
val delimiter = tblProperty.getOrElse("delimiter", ",")
plan.selectExpr(
s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
.selectExpr(exprList: _*)
.logicalPlan
case "json" =>
import org.apache.spark.sql.functions._
plan
.select(from_json(col("_value"), Util.convertToSparkSchema(sourceTable)) as "_data")
.select("_data.*")
.logicalPlan
}
// collect the newly generated ExprId
newPlan collect {
case p@Project(projectList, child) =>
projectList.map { expr =>
aliasMap.put(expr.name, expr.exprId)
}
p
}
newPlan
case plan: LogicalPlan => plan
}
// transform the stream plan to replace all attribute with the collected ExprId
val transFormedPlan = updatedQuery transform {
case p@Project(projectList: Seq[NamedExpression], child) =>
val newProjectList = projectList.map { expr =>
val newExpr = expr transform {
case attribute: Attribute =>
val exprId: ExprId = aliasMap.get(attribute.name)
if (exprId != null) {
if (exprId.id != attribute.exprId.id) {
AttributeReference(
attribute.name, attribute.dataType, attribute.nullable,
attribute.metadata)(exprId, attribute.qualifier)
} else {
attribute
}
} else {
attribute
}
}
newExpr.asInstanceOf[NamedExpression]
}
Project(newProjectList, child)
case f@Filter(condition: Expression, child) =>
val newCondition = condition transform {
case attribute: Attribute =>
val exprId: ExprId = aliasMap.get(attribute.name)
if (exprId != null) {
if (exprId.id != attribute.exprId.id) {
AttributeReference(
attribute.name, attribute.dataType, attribute.nullable,
attribute.metadata)(exprId, attribute.qualifier)
} else {
attribute
}
} else {
attribute
}
}
Filter(newCondition, child)
}
transFormedPlan
}
/**
* Create a streaming relation from the input logical relation (source table)
*
* @param sparkSession spark session
* @param logicalRelation source table to convert
* @return sourceTable and its streaming relation
*/
private def prepareStreamingRelation(
sparkSession: SparkSession,
logicalRelation: LogicalRelation): StreamingRelation = {
val sourceTable = logicalRelation.relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
val format = sourceTable.getFormat
if (format == null) {
throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
}
val streamReader = sparkSession
.readStream
.schema(getSparkSchema(sourceTable))
.format(format)
val dataFrame: DataFrame = format match {
case "csv" | "text" | "json" | "parquet" =>
shouldHaveProperty(tblProperty, "path", sourceTable)
streamReader.load(tblProperty.get("path"))
case other =>
throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
}
val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation]
// Since SparkSQL analyzer will match the UUID in attribute,
// create a new StreamRelation and re-use the same attribute from LogicalRelation
StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, logicalRelation.output)
}
private def shouldHaveProperty(
tblProperty: java.util.Map[String, String],
propertyName: String,
sourceTable: CarbonTable) : Unit = {
if (!tblProperty.containsKey(propertyName)) {
throw new MalformedCarbonCommandException(
s"tblproperty '$propertyName' should be provided for stream source " +
s"${sourceTable.getDatabaseName}.${sourceTable.getTableName}")
}
}
private def getSparkSchema(sourceTable: CarbonTable): StructType = {
val cols = sourceTable.getTableInfo.getFactTable.getListOfColumns.asScala.toArray
val sortedCols = cols.filter(_.getSchemaOrdinal != -1)
.sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
Util.convertToSparkSchema(sourceTable, sortedCols, false)
}
override protected def opName: String = "CREATE STREAM"
}