| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.spark; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.net.URL; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.UUID; |
| import javax.xml.parsers.ParserConfigurationException; |
| import javax.xml.transform.TransformerException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigException; |
| import org.apache.pig.PigWarning; |
| import org.apache.pig.backend.BackendException; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; |
| import org.apache.pig.backend.hadoop.executionengine.Launcher; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; |
| import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter; |
| import org.apache.pig.data.SchemaTupleBackend; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.plan.PlanException; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.impl.util.JarManager; |
| import org.apache.pig.impl.util.ObjectSerializer; |
| import org.apache.pig.impl.util.UDFContext; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.tools.pigstats.OutputStats; |
| import org.apache.pig.tools.pigstats.PigStats; |
| import org.apache.pig.tools.pigstats.spark.SparkCounterGroup; |
| import org.apache.pig.tools.pigstats.spark.SparkCounters; |
| import org.apache.pig.tools.pigstats.spark.SparkPigStats; |
| import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; |
| import org.apache.spark.SparkConf; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.scheduler.StatsReportListener; |
| |
| import com.google.common.base.Joiner; |
| |
| import static org.apache.pig.backend.hadoop.executionengine.spark.SparkShims.SPARK_VERSION; |
| |
| /** |
| * Main class that launches pig for Spark |
| */ |
| public class SparkLauncher extends Launcher { |
| |
| private static final Log LOG = LogFactory.getLog(SparkLauncher.class); |
| |
| // Our connection to Spark. It needs to be static so that it can be reused |
| // across jobs, because a |
| // new SparkLauncher gets created for each job. |
| private static JavaSparkContext sparkContext = null; |
| private static JobStatisticCollector jobStatisticCollector = new JobStatisticCollector(); |
| private String jobGroupID; |
| private PigContext pigContext = null; |
| private JobConf jobConf = null; |
| private String currentDirectoryPath = null; |
| private SparkEngineConf sparkEngineConf = new SparkEngineConf(); |
| private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName(); |
| |
| @Override |
| public PigStats launchPig(PhysicalPlan physicalPlan, String grpName, |
| PigContext pigContext) throws Exception { |
| if (LOG.isDebugEnabled()) |
| LOG.debug(physicalPlan); |
| this.pigContext = pigContext; |
| initialize(physicalPlan); |
| SparkOperPlan sparkplan = compile(physicalPlan, pigContext); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(sparkplan); |
| } |
| SparkPigStats sparkStats = (SparkPigStats) pigContext |
| .getExecutionEngine().instantiatePigStats(); |
| sparkStats.initialize(pigContext, sparkplan, jobConf); |
| PigStats.start(sparkStats); |
| |
| startSparkIfNeeded(jobConf, pigContext); |
| |
| jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(), |
| UUID.randomUUID().toString()); |
| jobConf.set(MRConfiguration.JOB_ID,jobGroupID); |
| jobConf.set(MRConfiguration.TASK_ID, HadoopShims.getNewTaskAttemptID().toString()); |
| |
| sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", |
| false); |
| jobStatisticCollector.reset(); |
| |
| this.currentDirectoryPath = Paths.get(".").toAbsolutePath() |
| .normalize().toString() |
| + "/"; |
| |
| new ParallelismSetter(sparkplan, jobConf).visit(); |
| |
| prepareSparkCounters(jobConf); |
| |
| // Create conversion map, mapping between pig operator and spark convertor |
| Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap |
| = new HashMap<Class<? extends PhysicalOperator>, RDDConverter>(); |
| convertMap.put(POLoad.class, new LoadConverter(pigContext, |
| physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf)); |
| convertMap.put(POStore.class, new StoreConverter(jobConf)); |
| convertMap.put(POForEach.class, new ForEachConverter(jobConf)); |
| convertMap.put(POFilter.class, new FilterConverter()); |
| convertMap.put(POPackage.class, new PackageConverter()); |
| convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter()); |
| convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter()); |
| convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter()); |
| convertMap.put(POLimit.class, new LimitConverter()); |
| convertMap.put(PODistinct.class, new DistinctConverter()); |
| convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc())); |
| convertMap.put(POSort.class, new SortConverter()); |
| convertMap.put(POSplit.class, new SplitConverter()); |
| convertMap.put(POSkewedJoin.class, new SkewedJoinConverter()); |
| convertMap.put(POMergeJoin.class, new MergeJoinConverter()); |
| convertMap.put(POCollectedGroup.class, new CollectedGroupConverter()); |
| convertMap.put(POCounter.class, new CounterConverter()); |
| convertMap.put(PORank.class, new RankConverter()); |
| convertMap.put(POStream.class, new StreamConverter()); |
| convertMap.put(POFRJoinSpark.class, new FRJoinConverter()); |
| convertMap.put(POMergeCogroup.class, new MergeCogroupConverter()); |
| convertMap.put(POReduceBySpark.class, new ReduceByConverter()); |
| convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter()); |
| convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext)); |
| convertMap.put(POSampleSortSpark.class, new SparkSampleSortConverter()); |
| convertMap.put(POPoissonSampleSpark.class, new PoissonSampleConverter()); |
| //Print SPARK plan before launching if needed |
| Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); |
| if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) { |
| LOG.info(sparkplan); |
| } |
| uploadResources(sparkplan); |
| |
| new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobStatisticCollector, jobGroupID, jobConf, pigContext).visit(); |
| cleanUpSparkJob(sparkStats); |
| sparkStats.finish(); |
| resetUDFContext(); |
| return sparkStats; |
| } |
| |
| private void resetUDFContext() { |
| UDFContext.getUDFContext().addJobConf(null); |
| } |
| |
| private void uploadResources(SparkOperPlan sparkPlan) throws IOException { |
| addFilesToSparkJob(sparkPlan); |
| addJarsToSparkJob(sparkPlan); |
| } |
| |
| private void optimize(SparkOperPlan plan, PigContext pigContext) throws IOException { |
| |
| Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); |
| |
| // Should be the first optimizer as it introduces new operators to the plan. |
| boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false); |
| if (!pigContext.inIllustrator && !noCombiner) { |
| CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan); |
| combinerOptimizer.visit(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("After combiner optimization:"); |
| LOG.debug(plan); |
| } |
| } |
| |
| boolean noSecondaryKey = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false); |
| if (!pigContext.inIllustrator && !noSecondaryKey) { |
| SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan); |
| skOptimizer.visit(); |
| } |
| |
| boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true); |
| if (isAccum) { |
| AccumulatorOptimizer accum = new AccumulatorOptimizer(plan); |
| accum.visit(); |
| } |
| |
| // removes the filter(constant(true)) operators introduced by |
| // splits. |
| NoopFilterRemover fRem = new NoopFilterRemover(plan); |
| fRem.visit(); |
| |
| boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Before multiquery optimization:"); |
| LOG.debug(plan); |
| } |
| |
| if (isMultiQuery) { |
| // reduces the number of SparkOpers in the Spark plan generated |
| // by multi-query (multi-store) script. |
| MultiQueryOptimizerSpark mqOptimizer = new MultiQueryOptimizerSpark(plan); |
| mqOptimizer.visit(); |
| } |
| |
| //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while |
| //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be |
| //changed and not suitable for CombinerOptimizer.More detail see PIG-4797 |
| JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan); |
| joinOptimizer.visit(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("After multiquery optimization:"); |
| LOG.debug(plan); |
| } |
| } |
| |
| private void cleanUpSparkJob(SparkPigStats sparkStats) throws ExecException { |
| LOG.info("Clean up Spark Job"); |
| boolean isLocal = System.getenv("SPARK_MASTER") != null ? System |
| .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true; |
| if (isLocal) { |
| String shipFiles = pigContext.getProperties().getProperty( |
| "pig.streaming.ship.files"); |
| if (shipFiles != null) { |
| for (String file : shipFiles.split(",")) { |
| File shipFile = new File(file); |
| File deleteFile = new File(currentDirectoryPath + "/" |
| + shipFile.getName()); |
| if (deleteFile.exists()) { |
| LOG.info(String.format("Delete ship file result: %b", |
| deleteFile.delete())); |
| } |
| } |
| } |
| String cacheFiles = pigContext.getProperties().getProperty( |
| "pig.streaming.cache.files"); |
| if (cacheFiles != null) { |
| for (String file : cacheFiles.split(",")) { |
| String fileName = extractFileName(file.trim()); |
| File deleteFile = new File(currentDirectoryPath + "/" |
| + fileName); |
| if (deleteFile.exists()) { |
| LOG.info(String.format("Delete cache file result: %b", |
| deleteFile.delete())); |
| } |
| } |
| } |
| } |
| |
| // run cleanup for all of the stores |
| for (OutputStats output : sparkStats.getOutputStats()) { |
| POStore store = output.getPOStore(); |
| try { |
| if (!output.isSuccessful()) { |
| store.getStoreFunc().cleanupOnFailure( |
| store.getSFile().getFileName(), |
| Job.getInstance(output.getConf())); |
| } else { |
| store.getStoreFunc().cleanupOnSuccess( |
| store.getSFile().getFileName(), |
| Job.getInstance(output.getConf())); |
| } |
| } catch (IOException e) { |
| throw new ExecException(e); |
| } catch (AbstractMethodError nsme) { |
| // Just swallow it. This means we're running against an |
| // older instance of a StoreFunc that doesn't implement |
| // this method. |
| } |
| } |
| } |
| |
| private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws IOException { |
| LOG.info("Add files Spark Job"); |
| String shipFiles = pigContext.getProperties().getProperty( |
| "pig.streaming.ship.files"); |
| shipFiles(shipFiles); |
| String cacheFiles = pigContext.getProperties().getProperty( |
| "pig.streaming.cache.files"); |
| cacheFiles(cacheFiles); |
| addUdfResourcesToSparkJob(sparkPlan); |
| } |
| |
| private void addUdfResourcesToSparkJob(SparkOperPlan sparkPlan) throws IOException { |
| SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new SparkPOUserFuncVisitor(sparkPlan); |
| sparkPOUserFuncVisitor.visit(); |
| Joiner joiner = Joiner.on(","); |
| String shipFiles = joiner.join(sparkPOUserFuncVisitor.getShipFiles()); |
| shipFiles(shipFiles); |
| String cacheFiles = joiner.join(sparkPOUserFuncVisitor.getCacheFiles()); |
| cacheFiles(cacheFiles); |
| } |
| |
| private void shipFiles(String shipFiles) |
| throws IOException { |
| if (shipFiles != null && !shipFiles.isEmpty()) { |
| for (String file : shipFiles.split(",")) { |
| File shipFile = new File(file.trim()); |
| if (shipFile.exists()) { |
| addResourceToSparkJobWorkingDirectory(shipFile, shipFile.getName(), |
| shipFile.getName().endsWith(".jar") ? ResourceType.JAR : ResourceType.FILE ); |
| } |
| } |
| } |
| } |
| |
| private void cacheFiles(String cacheFiles) throws IOException { |
| if (cacheFiles != null && !cacheFiles.isEmpty()) { |
| File tmpFolder = Files.createTempDirectory("cache").toFile(); |
| tmpFolder.deleteOnExit(); |
| for (String file : cacheFiles.split(",")) { |
| String fileName = extractFileName(file.trim()); |
| if( fileName != null) { |
| String fileUrl = extractFileUrl(file.trim()); |
| if( fileUrl != null) { |
| Path src = new Path(fileUrl); |
| File tmpFile = new File(tmpFolder, fileName); |
| Path tmpFilePath = new Path(tmpFile.getAbsolutePath()); |
| FileSystem fs = tmpFilePath.getFileSystem(jobConf); |
| //TODO:PIG-5241 Specify the hdfs path directly to spark and avoid the unnecessary download and upload in SparkLauncher.java |
| fs.copyToLocalFile(src, tmpFilePath); |
| tmpFile.deleteOnExit(); |
| LOG.info(String.format("CacheFile:%s", fileName)); |
| addResourceToSparkJobWorkingDirectory(tmpFile, fileName, |
| ResourceType.FILE); |
| } |
| } |
| } |
| } |
| } |
| |
| public static enum ResourceType { |
| JAR, |
| FILE |
| } |
| |
| |
| private void addJarsToSparkJob(SparkOperPlan sparkPlan) throws IOException { |
| Set<String> allJars = new HashSet<String>(); |
| LOG.info("Add default jars to Spark Job"); |
| allJars.addAll(JarManager.getDefaultJars()); |
| JarManager.addPigTestJarIfPresent(allJars); |
| LOG.info("Add script jars to Spark Job"); |
| for (String scriptJar : pigContext.scriptJars) { |
| allJars.add(scriptJar); |
| } |
| |
| LOG.info("Add udf jars to Spark Job"); |
| UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkPlan, pigContext); |
| udfJarsFinder.visit(); |
| Set<String> udfJars = udfJarsFinder.getUdfJars(); |
| for (String udfJar : udfJars) { |
| allJars.add(udfJar); |
| } |
| |
| File scriptUDFJarFile = JarManager.createPigScriptUDFJar(pigContext); |
| if (scriptUDFJarFile != null) { |
| LOG.info("Add script udf jar to Spark job"); |
| allJars.add(scriptUDFJarFile.getAbsolutePath().toString()); |
| } |
| |
| LOG.info("Add extra jars to Spark job"); |
| for (URL extraJarUrl : pigContext.extraJars) { |
| allJars.add(extraJarUrl.getFile()); |
| } |
| |
| //Upload all jars to spark working directory |
| for (String jar : allJars) { |
| File jarFile = new File(jar); |
| addResourceToSparkJobWorkingDirectory(jarFile, jarFile.getName(), |
| ResourceType.JAR); |
| } |
| } |
| |
| private void addResourceToSparkJobWorkingDirectory(File resourcePath, |
| String resourceName, ResourceType resourceType) throws IOException { |
| if (resourceType == ResourceType.JAR) { |
| LOG.info("Added jar " + resourceName); |
| } else { |
| LOG.info("Added file " + resourceName); |
| } |
| boolean isLocal = System.getenv("SPARK_MASTER") != null ? System |
| .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true; |
| if (isLocal) { |
| File localFile = new File(currentDirectoryPath + "/" |
| + resourceName); |
| if (resourcePath.getAbsolutePath().equals(localFile.getAbsolutePath()) |
| && resourcePath.exists()) { |
| return; |
| } |
| // When multiple threads start SparkLauncher, delete/copy actions should be in a critical section |
| synchronized(SparkLauncher.class) { |
| if (localFile.exists()) { |
| LOG.info(String.format( |
| "Jar file %s exists, ready to delete", |
| localFile.getAbsolutePath())); |
| localFile.delete(); |
| } else { |
| LOG.info(String.format("Jar file %s not exists,", |
| localFile.getAbsolutePath())); |
| } |
| Files.copy(Paths.get(new Path(resourcePath.getAbsolutePath()).toString()), |
| Paths.get(localFile.getAbsolutePath())); |
| } |
| } else { |
| if(resourceType == ResourceType.JAR){ |
| sparkContext.addJar(resourcePath.toURI().toURL() |
| .toExternalForm()); |
| }else if( resourceType == ResourceType.FILE){ |
| sparkContext.addFile(resourcePath.toURI().toURL() |
| .toExternalForm()); |
| } |
| } |
| } |
| |
| private String extractFileName(String cacheFileUrl) { |
| String[] tmpAry = cacheFileUrl.split("#"); |
| String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] |
| : null; |
| return fileName; |
| } |
| |
| private String extractFileUrl(String cacheFileUrl) { |
| String[] tmpAry = cacheFileUrl.split("#"); |
| String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] |
| : null; |
| return fileName; |
| } |
| |
| public SparkOperPlan compile(PhysicalPlan physicalPlan, |
| PigContext pigContext) throws PlanException, IOException, |
| VisitorException { |
| SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, |
| pigContext); |
| sparkCompiler.compile(); |
| sparkCompiler.connectSoftLink(); |
| SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan(); |
| |
| // optimize key - value handling in package |
| SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator( |
| sparkPlan); |
| pkgAnnotator.visit(); |
| |
| optimize(sparkPlan, pigContext); |
| return sparkPlan; |
| } |
| |
| |
| private static String getMaster(PigContext pc){ |
| String master = null; |
| if (pc.getExecType().isLocal()) { |
| master = "local"; |
| } else { |
| master = System.getenv("SPARK_MASTER"); |
| if (master == null) { |
| LOG.info("SPARK_MASTER not specified, using \"local\""); |
| master = "local"; |
| } |
| } |
| return master; |
| } |
| |
| /** |
| * Only one SparkContext may be active per JVM (SPARK-2243). When multiple threads start SparkLaucher, |
| * the static member sparkContext should be initialized only by either local or cluster mode at a time. |
| * |
| * In case it was already initialized with a different mode than what the new pigContext instance wants, it will |
| * close down the existing SparkContext and re-initalize it with the new mode. |
| */ |
| private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext pc) throws PigException { |
| String master = getMaster(pc); |
| if (sparkContext != null && !master.equals(sparkContext.master())){ |
| sparkContext.close(); |
| sparkContext = null; |
| } |
| if (sparkContext == null) { |
| String sparkHome = System.getenv("SPARK_HOME"); |
| if (!master.startsWith("local") && !master.equals("yarn-client")) { |
| // Check that we have the Mesos native library and Spark home |
| // are set |
| if (sparkHome == null) { |
| System.err |
| .println("You need to set SPARK_HOME to run on a Mesos cluster!"); |
| throw new PigException("SPARK_HOME is not set"); |
| } |
| } |
| |
| SparkConf sparkConf = new SparkConf(); |
| Properties pigCtxtProperties = pc.getProperties(); |
| |
| sparkConf.setMaster(master); |
| sparkConf.setAppName(pigCtxtProperties.getProperty(PigContext.JOB_NAME,"pig")); |
| // On Spark 1.6, Netty file server doesn't allow adding the same file with the same name twice |
| // This is a problem for streaming using a script + explicit ship the same script combination (PIG-5134) |
| // HTTP file server doesn't have this restriction, it overwrites the file if added twice |
| String useNettyFileServer = pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, "false"); |
| sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer); |
| |
| if (sparkHome != null && !sparkHome.isEmpty()) { |
| sparkConf.setSparkHome(sparkHome); |
| } else { |
| LOG.warn("SPARK_HOME is not set"); |
| } |
| |
| //Copy all spark.* properties to SparkConf |
| for (String key : pigCtxtProperties.stringPropertyNames()) { |
| if (key.startsWith("spark.")) { |
| LOG.debug("Copying key " + key + " with value " + |
| pigCtxtProperties.getProperty(key) + " to SparkConf"); |
| sparkConf.set(key, pigCtxtProperties.getProperty(key)); |
| } |
| } |
| |
| //see PIG-5200 why need to set spark.executor.userClassPathFirst as true on cluster modes |
| if (! "local".equals(master)) { |
| sparkConf.set("spark.executor.userClassPathFirst", "true"); |
| } |
| checkAndConfigureDynamicAllocation(master, sparkConf); |
| |
| sparkContext = new JavaSparkContext(sparkConf); |
| jobConf.set(SPARK_VERSION, sparkContext.version()); |
| SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener()); |
| SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener()); |
| } |
| } |
| |
| private static void checkAndConfigureDynamicAllocation(String master, SparkConf sparkConf) { |
| if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) { |
| if (!master.startsWith("yarn")) { |
| LOG.warn("Dynamic allocation is enabled, but " + |
| "script isn't running on yarn. Ignoring ..."); |
| } |
| if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { |
| LOG.info("Spark shuffle service is being enabled as dynamic " + |
| "allocation is enabled"); |
| sparkConf.set("spark.shuffle.service.enabled", "true"); |
| } |
| } |
| } |
| |
| // You can use this in unit tests to stop the SparkContext between tests. |
| static void stopSpark() { |
| if (sparkContext != null) { |
| sparkContext.stop(); |
| sparkContext = null; |
| } |
| } |
| |
| |
| @Override |
| public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, |
| String format, boolean verbose) throws IOException { |
| SparkOperPlan sparkPlan = compile(pp, pc); |
| explain(sparkPlan, ps, format, verbose); |
| } |
| |
| private void explain(SparkOperPlan sparkPlan, PrintStream ps, |
| String format, boolean verbose) |
| throws IOException { |
| Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys(); |
| List<OperatorKey> operKeyList = new ArrayList<>(allOperKeys.keySet()); |
| Collections.sort(operKeyList); |
| |
| if (format.equals("text")) { |
| for (OperatorKey operatorKey : operKeyList) { |
| SparkOperator op = sparkPlan.getOperator(operatorKey); |
| ps.print(op.getOperatorKey()); |
| List<SparkOperator> successors = sparkPlan.getSuccessors(op); |
| if (successors != null) { |
| ps.print("->"); |
| for (SparkOperator suc : successors) { |
| ps.print(suc.getOperatorKey() + " "); |
| } |
| } |
| ps.println(); |
| } |
| SparkPrinter printer = new SparkPrinter(ps, sparkPlan); |
| printer.setVerbose(verbose); |
| printer.visit(); |
| } else if (format.equals("dot")) { |
| ps.println("#--------------------------------------------------"); |
| ps.println("# Spark Plan"); |
| ps.println("#--------------------------------------------------"); |
| |
| DotSparkPrinter printer = new DotSparkPrinter(sparkPlan, ps); |
| printer.setVerbose(verbose); |
| printer.dump(); |
| ps.println(""); |
| } else if (format.equals("xml")) { |
| try { |
| XMLSparkPrinter printer = new XMLSparkPrinter(ps, sparkPlan); |
| printer.visit(); |
| printer.closePlan(); |
| } catch (ParserConfigurationException e) { |
| e.printStackTrace(); |
| } catch (TransformerException e) { |
| e.printStackTrace(); |
| } |
| } |
| else { |
| throw new IOException( |
| "Unsupported explain format. Supported formats are: text, dot, xml"); |
| } |
| } |
| |
| @Override |
| public void kill() throws BackendException { |
| if (sparkContext != null) { |
| sparkContext.stop(); |
| sparkContext = null; |
| } |
| } |
| |
| @Override |
| public void killJob(String jobID, Configuration conf) |
| throws BackendException { |
| if (sparkContext != null) { |
| sparkContext.stop(); |
| sparkContext = null; |
| } |
| } |
| |
| /** |
| * We store the value of udf.import.list in SparkEngineConf#properties |
| * Later we will serialize it in SparkEngineConf#writeObject and deserialize in SparkEngineConf#readObject. More |
| * detail see PIG-4920 |
| */ |
| private void saveUdfImportList() { |
| String udfImportList = Joiner.on(",").join(PigContext.getPackageImportList()); |
| sparkEngineConf.setSparkUdfImportListStr(udfImportList); |
| } |
| |
| private void initialize(PhysicalPlan physicalPlan) throws IOException { |
| saveUdfImportList(); |
| jobConf = SparkUtil.newJobConf(pigContext, physicalPlan, sparkEngineConf); |
| SchemaTupleBackend.initialize(jobConf, pigContext); |
| Utils.setDefaultTimeZone(jobConf); |
| PigMapReduce.sJobConfInternal.set(jobConf); |
| String parallelism = pigContext.getProperties().getProperty("spark.default.parallelism"); |
| if (parallelism != null) { |
| SparkPigContext.get().setDefaultParallelism(Integer.parseInt(parallelism)); |
| } |
| } |
| |
| /** |
| * Creates new SparkCounters instance for the job, initializes aggregate warning counters if required |
| * @param jobConf |
| * @throws IOException |
| */ |
| private static void prepareSparkCounters(JobConf jobConf) throws IOException { |
| SparkPigStatusReporter statusReporter = SparkPigStatusReporter.getInstance(); |
| SparkCounters counters = new SparkCounters(sparkContext); |
| |
| if ("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))) { |
| SparkCounterGroup pigWarningGroup = new SparkCounterGroup.MapSparkCounterGroup( |
| PIG_WARNING_FQCN, PIG_WARNING_FQCN,sparkContext |
| ); |
| pigWarningGroup.createCounter(PigWarning.SPARK_WARN.name(), new HashMap<String,Long>()); |
| pigWarningGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), new HashMap<String,Long>()); |
| counters.getSparkCounterGroups().put(PIG_WARNING_FQCN, pigWarningGroup); |
| } |
| statusReporter.setCounters(counters); |
| jobConf.set("pig.spark.counters", ObjectSerializer.serialize(counters)); |
| } |
| } |