blob: b96dd5f01583a684254844369071d3c1b5241862 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.spark;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.BlockStatement;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import java.util.List;
/**
* Relational expression that converts input of
* {@link org.apache.calcite.adapter.spark.SparkRel#CONVENTION Spark convention}
* into {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}.
*
* <p>Concretely, this means calling the
* {@link org.apache.spark.api.java.JavaRDD#collect()} method of an RDD
* and converting it to enumerable.</p>
*/
public class SparkToEnumerableConverter
extends ConverterImpl
implements EnumerableRel {
protected SparkToEnumerableConverter(RelOptCluster cluster,
RelTraitSet traits,
RelNode input) {
super(cluster, ConventionTraitDef.INSTANCE, traits, input);
}
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new SparkToEnumerableConverter(
getCluster(), traitSet, sole(inputs));
}
@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
return super.computeSelfCost(planner, mq).multiplyBy(.01);
}
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
// Generate:
// RDD rdd = ...;
// return SparkRuntime.asEnumerable(rdd);
final BlockBuilder list = new BlockBuilder();
final SparkRel child = (SparkRel) getInput();
final PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(),
getRowType(),
JavaRowFormat.CUSTOM);
SparkRel.Implementor sparkImplementor =
new SparkImplementorImpl(implementor);
final SparkRel.Result result = child.implementSpark(sparkImplementor);
final Expression rdd = list.append("rdd", result.block);
final Expression enumerable =
list.append(
"enumerable",
Expressions.call(
SparkMethod.AS_ENUMERABLE.method,
rdd));
list.add(
Expressions.return_(null, enumerable));
return implementor.result(physType, list.toBlock());
}
/** Implementation of
* {@link org.apache.calcite.adapter.spark.SparkRel.Implementor}. */
private static class SparkImplementorImpl extends SparkRel.Implementor {
private final EnumerableRelImplementor implementor;
public SparkImplementorImpl(EnumerableRelImplementor implementor) {
super(implementor.getRexBuilder());
this.implementor = implementor;
}
public SparkRel.Result result(PhysType physType,
BlockStatement blockStatement) {
return new SparkRel.Result(physType, blockStatement);
}
SparkRel.Result visitInput(SparkRel parent, int ordinal, SparkRel input) {
if (parent != null) {
assert input == parent.getInputs().get(ordinal);
}
return input.implementSpark(this);
}
public JavaTypeFactory getTypeFactory() {
return implementor.getTypeFactory();
}
}
}
// End SparkToEnumerableConverter.java