blob: d14f59ab8296ffe5ce3e291a5fda83684dfab4be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
@SuppressWarnings({"serial"})
public class ReduceByConverter implements RDDConverter<Tuple, Tuple, POReduceBySpark> {
private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
private static final TupleFactory tf = TupleFactory.getInstance();
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, op, 1);
int parallelism = SparkPigContext.get().getParallelism(predecessors, op);
RDD<Tuple> rdd = predecessors.get(0);
RDD<Tuple2<IndexedKey, Tuple>> rddPair
= rdd.map(new LocalRearrangeFunction(op.getLROp(), op.isUseSecondaryKey(), op.getSecondarySortOrder())
, SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
if (op.isUseSecondaryKey()) {
return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPKGOp());
} else {
PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
= new PairRDDFunctions<>(rddPair,
SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class), null);
RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
new MergeValuesFunction(op));
LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
}
}
private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
RDD<Tuple> rdd, POReduceBySpark op, int parallelism) {
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
SparkUtil.<Tuple, Object>getTuple2Manifest());
JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class));
//first sort the tuple by secondary key if enable useSecondaryKey sort
JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
new HashPartitioner(parallelism),
new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
JavaRDD<Tuple> jrdd = sorted.keys();
JavaRDD<Tuple2<IndexedKey, Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op));
return jrddPair;
}
private static class ToKeyNullValueFunction extends
AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
Serializable {
@Override
public Tuple2<Tuple, Object> apply(Tuple t) {
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyNullValueFunction in " + t);
}
Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyNullValueFunction out " + out);
}
return out;
}
}
/**
* Converts incoming locally rearranged tuple, which is of the form
* (index, key, value) into Tuple2<key, Tuple(key, value)>
*/
private static class ToKeyValueFunction implements
Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
private POReduceBySpark poReduce = null;
public ToKeyValueFunction(POReduceBySpark poReduce) {
this.poReduce = poReduce;
}
@Override
public Tuple2<IndexedKey, Tuple> call(Tuple t) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction in " + t);
}
Object key;
if ((poReduce != null) && (poReduce.isUseSecondaryKey())) {
key = ((Tuple) t.get(1)).get(0);
} else {
key = t.get(1);
}
Tuple tupleWithKey = tf.newTuple();
tupleWithKey.append(key);
tupleWithKey.append(t.get(2));
Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte) t.get(0), key), tupleWithKey);
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction out " + out);
}
return out;
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
}
/**
* Given two input tuples, this function outputs the resultant tuple.
* Additionally, it packages the input tuples to ensure the Algebraic Functions can work on them.
*/
private static final class MergeValuesFunction extends AbstractFunction2<Tuple, Tuple, Tuple>
implements Serializable {
private final POReduceBySpark poReduce;
public MergeValuesFunction(POReduceBySpark poReduce) {
this.poReduce = poReduce;
}
@Override
public Tuple apply(Tuple v1, Tuple v2) {
LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
Tuple result = tf.newTuple(2);
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
Tuple t = new DefaultTuple();
try {
// Package the input tuples so they can be processed by Algebraic functions.
Object key = v1.get(0);
if (key == null) {
key = "";
} else {
result.set(0, key);
}
bag.add((Tuple) v1.get(1));
bag.add((Tuple) v2.get(1));
t.append(key);
t.append(bag);
poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
Tuple packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result;
// Perform the operation
LOG.debug("MergeValuesFunction packagedTuple : " + t);
poReduce.attachInput(packagedTuple);
Result r = poReduce.getNext(poReduce.getResultType());
// Ensure output is consistent with the output of KeyValueFunction
// If we return r.result, the result will be something like this:
// (ABC,(2),(3)) - A tuple with key followed by values.
// But, we want the result to look like this:
// (ABC,((2),(3))) - A tuple with key and a value tuple (containing values).
// Hence, the construction of a new value tuple
Tuple valueTuple = tf.newTuple();
for (Object o : ((Tuple) r.result).getAll()) {
if (!o.equals(key)) {
valueTuple.append(o);
}
}
result.set(1,valueTuple);
LOG.debug("MergeValuesFunction out : " + result);
return result;
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
}
/**
* This function transforms the Tuple to ensure it is packaged as per requirements of the Operator's packager.
*/
private static final class ToTupleFunction extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple>
implements Serializable {
private final POReduceBySpark poReduce;
public ToTupleFunction(POReduceBySpark poReduce) {
this.poReduce = poReduce;
}
@Override
public Tuple apply(Tuple2<IndexedKey, Tuple> v1) {
LOG.debug("ToTupleFunction in : " + v1);
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
Tuple t = new DefaultTuple();
Tuple packagedTuple = null;
try {
Object key = v1._2().get(0);
bag.add((Tuple) v1._2().get(1));
t.append(key);
t.append(bag);
poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result;
} catch (ExecException e) {
throw new RuntimeException(e);
}
LOG.debug("ToTupleFunction out : " + packagedTuple);
return packagedTuple;
}
}
/**
* Converts incoming locally rearranged tuple, which is of the form
* (index, key, value) into Tuple2<key, Tuple(key, value)>
*/
private static class LocalRearrangeFunction extends
AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
private final POLocalRearrange lra;
private boolean useSecondaryKey;
private boolean[] secondarySortOrder;
public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
if( useSecondaryKey ) {
this.useSecondaryKey = useSecondaryKey;
this.secondarySortOrder = secondarySortOrder;
}
this.lra = lra;
}
@Override
public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
if (LOG.isDebugEnabled()) {
LOG.debug("LocalRearrangeFunction in " + t);
}
Result result;
try {
lra.setInputs(null);
lra.attachInput(t);
result = lra.getNextTuple();
if (result == null) {
throw new RuntimeException(
"Null response found for LocalRearange on tuple: "
+ t);
}
switch (result.returnStatus) {
case POStatus.STATUS_OK:
// (index, key, Tuple(key, value))
Tuple resultTuple = (Tuple) result.result;
Object key = resultTuple.get(1);
IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
if( useSecondaryKey) {
indexedKey.setUseSecondaryKey(useSecondaryKey);
indexedKey.setSecondarySortOrder(secondarySortOrder);
}
Tuple outValue = TupleFactory.getInstance().newTuple();
outValue.append(key);
outValue.append(resultTuple.get(2));
Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
outValue);
if (LOG.isDebugEnabled()) {
LOG.debug("LocalRearrangeFunction out " + out);
}
return out;
default:
throw new RuntimeException(
"Unexpected response code from operator "
+ lra + " : " + result);
}
} catch (ExecException e) {
throw new RuntimeException(
"Couldn't do LocalRearange on tuple: " + t, e);
}
}
}
}