blob: 065dfedad846e8ffa4dacd9fe679f62e3098e143 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
import org.apache.spark.util.{ScalaCompilerUtil, Utils}
private[sql] class CodeGenerateFactory(version: String) {
val optimizerFactory = if (version.equals("1.6.2") || version.equals("1.6.3")) {
ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_6_OptimizerString)
.asInstanceOf[AbstractCarbonOptimizerFactory]
} else if (version.startsWith("1.6") || version.startsWith("1.5")) {
ScalaCompilerUtil.compiledCode(CodeTemplates.defaultOptimizerString)
.asInstanceOf[AbstractCarbonOptimizerFactory]
} else {
throw new UnsupportedOperationException(s"Spark version $version is not supported")
}
val expandFactory = if (version.startsWith("1.5")) {
ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_5ExpandString)
.asInstanceOf[AbstractCarbonExpandFactory]
} else if (version.startsWith("1.6")) {
new AbstractCarbonExpandFactory {
override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
val loader = Utils.getContextOrSparkClassLoader
try {
val cons = loader.loadClass("org.apache.spark.sql.catalyst.plans.logical.Expand")
.getDeclaredConstructors
cons.head.setAccessible(true)
cons.head.newInstance(expand.projections, expand.output, child).asInstanceOf[Expand]
} catch {
case e: Exception => null
}
}
}
} else {
throw new UnsupportedOperationException(s"Spark version $version is not supported")
}
}
object CodeGenerateFactory {
private var codeGenerateFactory: CodeGenerateFactory = _
def init(version: String): Unit = {
if (codeGenerateFactory == null) {
codeGenerateFactory = new CodeGenerateFactory(version)
}
}
def getInstance(): CodeGenerateFactory = {
codeGenerateFactory
}
def createDefaultOptimizer(conf: CatalystConf, sc: SparkContext): Optimizer = {
val name = "org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer"
val loader = Utils.getContextOrSparkClassLoader
try {
val cons = loader.loadClass(name + "$").getDeclaredConstructors
cons.head.setAccessible(true)
cons.head.newInstance().asInstanceOf[Optimizer]
} catch {
case e: Exception =>
loader.loadClass(name).getConstructor(classOf[CatalystConf])
.newInstance(conf).asInstanceOf[Optimizer]
}
}
}
object CodeTemplates {
val spark1_6_OptimizerString =
s"""
import org.apache.spark.sql._;
import org.apache.spark.sql.optimizer._;
import org.apache.spark.sql.catalyst.plans.logical._;
import org.apache.spark.sql.catalyst._;
import org.apache.spark.sql.catalyst.optimizer.Optimizer;
new AbstractCarbonOptimizerFactory {
override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
class CarbonOptimizer1(optimizer: Optimizer, conf: CarbonSQLConf)
extends Optimizer(conf) {
override val batches = Nil;
override def execute(plan: LogicalPlan): LogicalPlan = {
CarbonOptimizer.execute(plan, optimizer);
}
}
new CarbonOptimizer1(optimizer, conf);
}
}
"""
val defaultOptimizerString =
s"""
import org.apache.spark.sql._;
import org.apache.spark.sql.optimizer._;
import org.apache.spark.sql.catalyst.plans.logical._;
import org.apache.spark.sql.catalyst._;
import org.apache.spark.sql.catalyst.optimizer.Optimizer;
new AbstractCarbonOptimizerFactory {
override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
class CarbonOptimizer2(optimizer: Optimizer, conf: CarbonSQLConf) extends Optimizer {
val batches = Nil;
override def execute(plan: LogicalPlan): LogicalPlan = {
CarbonOptimizer.execute(plan, optimizer);
}
}
new CarbonOptimizer2(optimizer, conf);
}
}
"""
val spark1_5ExpandString =
s"""
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
new AbstractCarbonExpandFactory {
override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
Expand(expand.bitmasks, expand.groupByExprs, expand.gid, child)
}
}
"""
}
abstract class AbstractCarbonOptimizerFactory {
def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf) : Optimizer
}
abstract class AbstractCarbonExpandFactory {
def createExpand(expand: Expand, child: LogicalPlan) : Expand
}