| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.spark.converter; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; |
| import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.spark.HashPartitioner; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.function.Function; |
| import org.apache.spark.rdd.CoGroupedRDD; |
| import org.apache.spark.rdd.RDD; |
| |
| import scala.Product2; |
| import scala.Tuple2; |
| import scala.collection.JavaConversions; |
| import scala.collection.Seq; |
| import scala.runtime.AbstractFunction1; |
| |
| @SuppressWarnings({ "serial" }) |
| public class GlobalRearrangeConverter implements |
| RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> { |
| private static final Log LOG = LogFactory |
| .getLog(GlobalRearrangeConverter.class); |
| |
| private static final TupleFactory tf = TupleFactory.getInstance(); |
| |
| @Override |
| public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, |
| POGlobalRearrangeSpark op) throws IOException { |
| SparkUtil.assertPredecessorSizeGreaterThan(predecessors, |
| op, 0); |
| int parallelism = SparkPigContext.get().getParallelism(predecessors, |
| op); |
| |
| // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input), |
| // vs using groupBy (like we do in this commented code), vs using |
| // reduceByKey(). This is a pending task in Pig on Spark Milestone 1 |
| // Once we figure that out, we can allow custom partitioning for |
| // secondary sort case as well. |
| // if (predecessors.size() == 1) { |
| // // GROUP BY |
| // JavaPairRDD<Object, Iterable<Tuple>> prdd; |
| // if (op.isUseSecondaryKey()) { |
| // prdd = handleSecondarySort(predecessors.get(0), op, parallelism); |
| // } else { |
| // JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); |
| // prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism); |
| // prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(), |
| // parallelism)); |
| // } |
| // JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op)); |
| // return jrdd2.rdd(); |
| // |
| // if (predecessors.size() == 1 && op.isUseSecondaryKey()) { |
| // return handleSecondarySort(predecessors.get(0), op, parallelism); |
| // } |
| List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>(); |
| |
| if (predecessors.size() == 1 && op.isUseSecondaryKey()) { |
| rddPairs.add(handleSecondarySort(predecessors.get(0), op, parallelism).rdd()); |
| } else { |
| for (RDD<Tuple> rdd : predecessors) { |
| JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class)); |
| JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction()); |
| rddPairs.add(rddPair.rdd()); |
| } |
| } |
| |
| // Something's wrong with the type parameters of CoGroupedRDD |
| // key and value are the same type ??? |
| CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>( |
| (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions |
| .asScalaBuffer(rddPairs).toSeq()), |
| SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), SparkUtil.getManifest(Object.class)); |
| |
| RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd = |
| (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD; |
| return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd(); |
| } |
| |
| private JavaRDD<Tuple2<IndexedKey,Tuple>> handleSecondarySort( |
| RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) { |
| |
| RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(), |
| SparkUtil.<Tuple, Object>getTuple2Manifest()); |
| |
| JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair, |
| SparkUtil.getManifest(Tuple.class), |
| SparkUtil.getManifest(Object.class)); |
| |
| //first sort the tuple by secondary key if enable useSecondaryKey sort |
| JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions( |
| new HashPartitioner(parallelism), |
| new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder())); |
| JavaRDD<Tuple> jrdd = sorted.keys(); |
| JavaRDD<Tuple2<IndexedKey,Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op)); |
| return jrddPair; |
| } |
| |
| private static class RemoveValueFunction implements |
| FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { |
| |
| private class Tuple2TransformIterable implements Iterable<Tuple> { |
| |
| Iterator<Tuple2<Tuple, Object>> in; |
| |
| Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) { |
| in = input; |
| } |
| |
| public Iterator<Tuple> iterator() { |
| return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) { |
| @Override |
| protected Tuple transform(Tuple2<Tuple, Object> next) { |
| return next._1(); |
| } |
| }; |
| } |
| } |
| |
| @Override |
| public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { |
| return new Tuple2TransformIterable(input).iterator(); |
| } |
| } |
| |
| private static class ToKeyNullValueFunction extends |
| AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements |
| Serializable { |
| |
| @Override |
| public Tuple2<Tuple, Object> apply(Tuple t) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToKeyNullValueFunction in " + t); |
| } |
| |
| Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToKeyNullValueFunction out " + out); |
| } |
| |
| return out; |
| } |
| } |
| |
| /** |
| * Function that extract keys from locally rearranged tuples. |
| */ |
| private static class GetKeyFunction implements Function<Tuple, Object>, Serializable { |
| public final POGlobalRearrangeSpark glrSpark; |
| |
| public GetKeyFunction(POGlobalRearrangeSpark globalRearrangeSpark) { |
| this.glrSpark = globalRearrangeSpark; |
| } |
| |
| public Object call(Tuple t) { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("GetKeyFunction in " + t); |
| } |
| |
| Object key; |
| if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) { |
| key = ((Tuple) t.get(1)).get(0); |
| } else { |
| key = t.get(1); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("GetKeyFunction out " + key); |
| } |
| |
| return key; |
| } catch (ExecException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * Function that converts elements of PairRDD to regular RDD. |
| * - Input (PairRDD) contains elements of the form |
| * Tuple2<key, Iterable<(index, key, value)>>. |
| * - Output (regular RDD) contains elements of the form |
| * Tuple<(key, iterator to (index, key, value))> |
| */ |
| private static class GroupTupleFunction |
| implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>, |
| Serializable { |
| public final POGlobalRearrangeSpark glrSpark; |
| |
| public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) { |
| this.glrSpark = globalRearrangeSpark; |
| } |
| |
| public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("GroupTupleFunction in " + v1); |
| } |
| |
| Tuple tuple = tf.newTuple(2); |
| tuple.set(0, v1._1()); // key |
| // Note that v1._2() is (index, key, value) tuple, and |
| // v1._2().iterator() is iterator to Seq<Tuple> (aka bag of values) |
| tuple.set(1, v1._2().iterator()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("GroupTupleFunction out " + tuple); |
| } |
| |
| return tuple; |
| } catch (ExecException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| |
| /** |
| * Converts incoming locally rearranged tuple, which is of the form |
| * (index, key, value) into Tuple2<key, value> |
| */ |
| private static class ToKeyValueFunction implements |
| Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable { |
| |
| private POGlobalRearrangeSpark glrSpark = null; |
| |
| public ToKeyValueFunction(POGlobalRearrangeSpark glrSpark) { |
| this.glrSpark = glrSpark; |
| } |
| |
| public ToKeyValueFunction() { |
| |
| } |
| |
| @Override |
| public Tuple2<IndexedKey, Tuple> call(Tuple t) { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToKeyValueFunction in " + t); |
| } |
| |
| Object key = null; |
| if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) { |
| key = ((Tuple) t.get(1)).get(0); |
| } else { |
| key = t.get(1); |
| } |
| |
| Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>( |
| new IndexedKey((Byte) t.get(0), key), |
| (Tuple) t.get(2)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToKeyValueFunction out " + out); |
| } |
| |
| return out; |
| } catch (ExecException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * Converts cogroup output where each element is {key, bag[]} represented |
| * as Tuple2<Object, Seq<Seq<Tuple>>> into tuple of form |
| * (key, Iterator<(index, key, value)>) |
| */ |
| private static class ToGroupKeyValueFunction implements |
| Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable { |
| |
| @Override |
| public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToGroupKeyValueFunction in " + input); |
| } |
| |
| final Object key = input._1().getKey(); |
| Object obj = input._2(); |
| // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq |
| Seq<Tuple>[] bags = (Seq<Tuple>[])obj; |
| int i = 0; |
| List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>(); |
| for (int j = 0; j < bags.length; j ++) { |
| Seq<Tuple> bag = bags[j]; |
| Iterator<Tuple> iterator = JavaConversions |
| .asJavaCollection(bag).iterator(); |
| final int index = i; |
| tupleIterators.add(new IteratorTransform<Tuple, Tuple>( |
| iterator) { |
| @Override |
| protected Tuple transform(Tuple next) { |
| try { |
| Tuple tuple = tf.newTuple(3); |
| tuple.set(0, index); |
| tuple.set(1, next); |
| return tuple; |
| } catch (ExecException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }); |
| ++i; |
| } |
| |
| Tuple out = tf.newTuple(2); |
| out.set(0, key); |
| out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator())); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ToGroupKeyValueFunction out " + out); |
| } |
| |
| return out; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private static class IteratorUnion<T> implements Iterator<T> { |
| |
| private final Iterator<Iterator<T>> iterators; |
| |
| private Iterator<T> current; |
| |
| public IteratorUnion(Iterator<Iterator<T>> iterators) { |
| super(); |
| this.iterators = iterators; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| if (current != null && current.hasNext()) { |
| return true; |
| } else if (iterators.hasNext()) { |
| current = iterators.next(); |
| return hasNext(); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public T next() { |
| return current.next(); |
| } |
| |
| @Override |
| public void remove() { |
| current.remove(); |
| } |
| |
| } |
| } |