blob: 8195e5b6bde7a39e934f8310bdd254f7cc7658af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import scala.Product2;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
public class SparkUtil {
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
return ClassTag$.MODULE$.apply(clazz);
}
@SuppressWarnings("unchecked")
public static <K, V> ClassTag<Tuple2<K, V>> getTuple2Manifest() {
return (ClassTag<Tuple2<K, V>>) (Object) getManifest(Tuple2.class);
}
@SuppressWarnings("unchecked")
public static <K, V> ClassTag<Product2<K, V>> getProduct2Manifest() {
return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
}
public static JobConf newJobConf(PigContext pigContext, PhysicalPlan physicalPlan, SparkEngineConf sparkEngineConf) throws
IOException {
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
//Serialize the thread local variable UDFContext#udfConfs, UDFContext#clientSysProps and PigContext#packageImportList
//inside SparkEngineConf separately
jobConf.set("spark.engine.conf",ObjectSerializer.serialize(sparkEngineConf));
//Serialize the PigContext so it's available in Executor thread.
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
// Serialize the thread local variable inside PigContext separately
// Although after PIG-4920, we store udf.import.list in SparkEngineConf
// but we still need store it in jobConf because it will be used in PigOutputFormat#setupUdfEnvAndStores
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
Random rand = new Random();
jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, Integer.toString(rand.nextInt()));
jobConf.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
physicalPlan, POStore.class);
POStore firstStore = stores.getFirst();
if (firstStore != null) {
MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
jobConf);
}
return jobConf;
}
public static <T> Seq<T> toScalaSeq(List<T> list) {
return JavaConversions.asScalaBuffer(list);
}
public static void assertPredecessorSize(List<RDD<Tuple>> predecessors,
PhysicalOperator physicalOperator, int size) {
if (predecessors.size() != size) {
throw new RuntimeException("Should have " + size
+ " predecessors for " + physicalOperator.getClass()
+ ". Got : " + predecessors.size());
}
}
public static void assertPredecessorSizeGreaterThan(
List<RDD<Tuple>> predecessors, PhysicalOperator physicalOperator,
int size) {
if (predecessors.size() <= size) {
throw new RuntimeException("Should have greater than" + size
+ " predecessors for " + physicalOperator.getClass()
+ ". Got : " + predecessors.size());
}
}
public static Partitioner getPartitioner(String customPartitioner, int parallelism) {
if (customPartitioner == null) {
return new HashPartitioner(parallelism);
} else {
return new MapReducePartitionerWrapper(customPartitioner, parallelism);
}
}
// createIndexerSparkNode is a utility to create an indexer spark node with baseSparkOp
static public void createIndexerSparkNode(SparkOperator baseSparkOp, String scope, NodeIdGenerator nig) throws PlanException, ExecException {
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
prj.setStar(true);
prj.setOverloaded(false);
prj.setResultType(DataType.TUPLE);
ep.add(prj);
eps.add(ep);
List<Boolean> ascCol = new ArrayList<Boolean>();
ascCol.add(true);
int requestedParallelism = baseSparkOp.requestedParallelism;
POSort sort = new POSort(new OperatorKey(scope, nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
//POSort is added to sort the index tuples genereated by MergeJoinIndexer.More detail, see PIG-4601
baseSparkOp.physicalPlan.addAsLeaf(sort);
}
}