blob: fc5274e6bba2d917c32495047fb18f92483ff738 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.spark;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.enumerable.RexImpTable;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.BlockStatement;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Values;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexMultisetUtil;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import scala.Tuple2;
import java.lang.reflect.Type;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Random;
/**
* Rules for the {@link SparkRel#CONVENTION Spark calling convention}.
*
* @see JdbcToSparkConverterRule
*/
public abstract class SparkRules {
private SparkRules() {}
/** Rule that converts from Spark to enumerable convention. */
public static final SparkToEnumerableConverterRule SPARK_TO_ENUMERABLE =
SparkToEnumerableConverterRule.DEFAULT_CONFIG
.toRule(SparkToEnumerableConverterRule.class);
/** Rule that converts from enumerable to Spark convention. */
public static final EnumerableToSparkConverterRule ENUMERABLE_TO_SPARK =
EnumerableToSparkConverterRule.DEFAULT_CONFIG
.toRule(EnumerableToSparkConverterRule.class);
/** Rule that converts a {@link org.apache.calcite.rel.logical.LogicalCalc}
* to a {@link org.apache.calcite.adapter.spark.SparkRules.SparkCalc}. */
public static final SparkCalcRule SPARK_CALC_RULE =
SparkCalcRule.DEFAULT_CONFIG.toRule(SparkCalcRule.class);
/** Rule that implements VALUES operator in Spark convention. */
public static final SparkValuesRule SPARK_VALUES_RULE =
SparkValuesRule.DEFAULT_CONFIG.toRule(SparkValuesRule.class);
public static List<RelOptRule> rules() {
return ImmutableList.of(
// TODO: add SparkProjectRule, SparkFilterRule, SparkProjectToCalcRule,
// SparkFilterToCalcRule, and remove the following 2 rules.
CoreRules.PROJECT_TO_CALC,
CoreRules.FILTER_TO_CALC,
ENUMERABLE_TO_SPARK,
SPARK_TO_ENUMERABLE,
SPARK_VALUES_RULE,
SPARK_CALC_RULE);
}
/** Planner rule that converts from enumerable to Spark convention.
*
* @see #ENUMERABLE_TO_SPARK */
static class EnumerableToSparkConverterRule extends ConverterRule {
/** Default configuration. */
static final Config DEFAULT_CONFIG = Config.INSTANCE
.withConversion(RelNode.class, EnumerableConvention.INSTANCE,
SparkRel.CONVENTION, "EnumerableToSparkConverterRule")
.withRuleFactory(EnumerableToSparkConverterRule::new);
EnumerableToSparkConverterRule(Config config) {
super(config);
}
@Override public RelNode convert(RelNode rel) {
return new EnumerableToSparkConverter(rel.getCluster(),
rel.getTraitSet().replace(SparkRel.CONVENTION), rel);
}
}
/** Planner rule that converts from Spark to enumerable convention.
*
* @see #SPARK_TO_ENUMERABLE */
static class SparkToEnumerableConverterRule extends ConverterRule {
static final Config DEFAULT_CONFIG = Config.INSTANCE
.withConversion(RelNode.class, SparkRel.CONVENTION,
EnumerableConvention.INSTANCE, "SparkToEnumerableConverterRule")
.withRuleFactory(SparkToEnumerableConverterRule::new);
SparkToEnumerableConverterRule(Config config) {
super(config);
}
@Override public RelNode convert(RelNode rel) {
return new SparkToEnumerableConverter(rel.getCluster(),
rel.getTraitSet().replace(EnumerableConvention.INSTANCE), rel);
}
}
/** Planner rule that implements VALUES operator in Spark convention.
*
* @see #SPARK_VALUES_RULE */
public static class SparkValuesRule extends ConverterRule {
/** Default configuration. */
static final Config DEFAULT_CONFIG = Config.INSTANCE
.withConversion(LogicalValues.class, Convention.NONE,
SparkRel.CONVENTION, "SparkValuesRule")
.withRuleFactory(SparkValuesRule::new);
SparkValuesRule(Config config) {
super(config);
}
@Override public RelNode convert(RelNode rel) {
LogicalValues values = (LogicalValues) rel;
return new SparkValues(
values.getCluster(),
values.getRowType(),
values.getTuples(),
values.getTraitSet().replace(getOutTrait()));
}
}
/** VALUES construct implemented in Spark. */
public static class SparkValues extends Values implements SparkRel {
SparkValues(
RelOptCluster cluster,
RelDataType rowType,
ImmutableList<ImmutableList<RexLiteral>> tuples,
RelTraitSet traitSet) {
super(cluster, rowType, tuples, traitSet);
}
@Override public RelNode copy(
RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return new SparkValues(
getCluster(), rowType, tuples, traitSet);
}
public Result implementSpark(Implementor implementor) {
/*
return Linq4j.asSpark(
new Object[][] {
new Object[] {1, 2},
new Object[] {3, 4}
});
*/
final JavaTypeFactory typeFactory =
(JavaTypeFactory) getCluster().getTypeFactory();
final BlockBuilder builder = new BlockBuilder();
final PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(),
getRowType(),
JavaRowFormat.CUSTOM);
final Type rowClass = physType.getJavaRowType();
final List<Expression> expressions = new ArrayList<>();
final List<RelDataTypeField> fields = rowType.getFieldList();
for (List<RexLiteral> tuple : tuples) {
final List<Expression> literals = new ArrayList<>();
for (Pair<RelDataTypeField, RexLiteral> pair
: Pair.zip(fields, tuple)) {
literals.add(
RexToLixTranslator.translateLiteral(
pair.right,
pair.left.getType(),
typeFactory,
RexImpTable.NullAs.NULL));
}
expressions.add(physType.record(literals));
}
builder.add(
Expressions.return_(null,
Expressions.call(SparkMethod.ARRAY_TO_RDD.method,
Expressions.call(SparkMethod.GET_SPARK_CONTEXT.method,
implementor.getRootExpression()),
Expressions.newArrayInit(Primitive.box(rowClass),
expressions))));
return implementor.result(physType, builder.toBlock());
}
}
/**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalCalc} to an
* {@link org.apache.calcite.adapter.spark.SparkRules.SparkCalc}.
*
* @see #SPARK_CALC_RULE
*/
private static class SparkCalcRule extends ConverterRule {
/** Default configuration. */
static final Config DEFAULT_CONFIG = Config.INSTANCE
.withConversion(LogicalCalc.class, Convention.NONE, SparkRel.CONVENTION,
"SparkCalcRule")
.withRuleFactory(SparkCalcRule::new);
SparkCalcRule(Config config) {
super(config);
}
@Override public RelNode convert(RelNode rel) {
final LogicalCalc calc = (LogicalCalc) rel;
// If there's a multiset, let FarragoMultisetSplitter work on it
// first.
final RexProgram program = calc.getProgram();
if (RexMultisetUtil.containsMultiset(program)
|| program.containsAggs()) {
return null;
}
return new SparkCalc(
rel.getCluster(),
rel.getTraitSet().replace(SparkRel.CONVENTION),
convert(calc.getInput(),
calc.getInput().getTraitSet().replace(SparkRel.CONVENTION)),
program);
}
}
/** Implementation of {@link org.apache.calcite.rel.core.Calc}
* in Spark convention. */
public static class SparkCalc extends SingleRel implements SparkRel {
private final RexProgram program;
public SparkCalc(RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
RexProgram program) {
super(cluster, traitSet, input);
assert getConvention() == SparkRel.CONVENTION;
assert !program.containsAggs();
this.program = program;
this.rowType = program.getOutputRowType();
}
@Deprecated // to be removed before 2.0
public SparkCalc(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
RexProgram program, int flags) {
this(cluster, traitSet, input, program);
Util.discard(flags);
}
@Override public RelWriter explainTerms(RelWriter pw) {
return program.explainCalc(super.explainTerms(pw));
}
@Override public double estimateRowCount(RelMetadataQuery mq) {
return RelMdUtil.estimateFilteredRows(getInput(), program, mq);
}
@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
double dRows = mq.getRowCount(this);
double dCpu = mq.getRowCount(getInput())
* program.getExprCount();
double dIo = 0;
return planner.getCostFactory().makeCost(dRows, dCpu, dIo);
}
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new SparkCalc(
getCluster(),
traitSet,
sole(inputs),
program);
}
@Deprecated // to be removed before 2.0
public int getFlags() {
return 1;
}
public Result implementSpark(Implementor implementor) {
final JavaTypeFactory typeFactory = implementor.getTypeFactory();
final BlockBuilder builder = new BlockBuilder();
final SparkRel child = (SparkRel) getInput();
final Result result = implementor.visitInput(this, 0, child);
final PhysType physType =
PhysTypeImpl.of(
typeFactory, getRowType(), JavaRowFormat.CUSTOM);
// final RDD<Employee> inputRdd = <<child adapter>>;
// return inputRdd.flatMap(
// new FlatMapFunction<Employee, X>() {
// public List<X> call(Employee e) {
// if (!(e.empno < 10)) {
// return Collections.emptyList();
// }
// return Collections.singletonList(
// new X(...)));
// }
// })
Type outputJavaType = physType.getJavaRowType();
final Type rddType =
Types.of(
JavaRDD.class, outputJavaType);
Type inputJavaType = result.physType.getJavaRowType();
final Expression inputRdd_ =
builder.append(
"inputRdd",
result.block);
BlockBuilder builder2 = new BlockBuilder();
final ParameterExpression e_ =
Expressions.parameter(inputJavaType, "e");
if (program.getCondition() != null) {
Expression condition =
RexToLixTranslator.translateCondition(
program,
typeFactory,
builder2,
new RexToLixTranslator.InputGetterImpl(
Collections.singletonList(
Pair.of((Expression) e_, result.physType))),
null, implementor.getConformance());
builder2.add(
Expressions.ifThen(
Expressions.not(condition),
Expressions.return_(null,
Expressions.call(
BuiltInMethod.COLLECTIONS_EMPTY_LIST.method))));
}
final SqlConformance conformance = SqlConformanceEnum.DEFAULT;
List<Expression> expressions =
RexToLixTranslator.translateProjects(
program,
typeFactory,
conformance,
builder2,
null,
DataContext.ROOT,
new RexToLixTranslator.InputGetterImpl(
Collections.singletonList(
Pair.of((Expression) e_, result.physType))),
null);
builder2.add(
Expressions.return_(null,
Expressions.convert_(
Expressions.call(
BuiltInMethod.COLLECTIONS_SINGLETON_LIST.method,
physType.record(expressions)),
List.class)));
final BlockStatement callBody = builder2.toBlock();
builder.add(
Expressions.return_(
null,
Expressions.call(
inputRdd_,
SparkMethod.RDD_FLAT_MAP.method,
Expressions.lambda(
SparkRuntime.CalciteFlatMapFunction.class,
callBody,
e_))));
return implementor.result(physType, builder.toBlock());
}
}
// Play area
public static void main(String[] args) {
final JavaSparkContext sc = new JavaSparkContext("local[1]", "calcite");
final JavaRDD<String> file = sc.textFile("/usr/share/dict/words");
System.out.println(
file.map(s -> s.substring(0, Math.min(s.length(), 1)))
.distinct().count());
file.cache();
String s =
file.groupBy((Function<String, String>) s1 -> s1.substring(0, Math.min(s1.length(), 1))
//CHECKSTYLE: IGNORE 1
).map((Function<Tuple2<String, Iterable<String>>, Object>) pair ->
pair._1() + ":" + Iterables.size(pair._2())).collect().toString();
System.out.print(s);
final JavaRDD<Integer> rdd = sc.parallelize(
new AbstractList<Integer>() {
final Random random = new Random();
@Override public Integer get(int index) {
System.out.println("get(" + index + ")");
return random.nextInt(100);
}
@Override public int size() {
System.out.println("size");
return 10;
}
});
System.out.println(
rdd.groupBy((Function<Integer, Integer>) integer -> integer % 2).collect().toString());
System.out.println(
file.flatMap((FlatMapFunction<String, Pair<String, Integer>>) x -> {
if (!x.startsWith("a")) {
return Collections.emptyIterator();
}
return Collections.singletonList(
Pair.of(x.toUpperCase(Locale.ROOT), x.length())).iterator();
})
.take(5)
.toString());
}
}