| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.scheduler |
| |
| import java.lang.management.ManagementFactory |
| import java.nio.ByteBuffer |
| import java.util.Properties |
| |
| import org.apache.spark._ |
| import org.apache.spark.broadcast.Broadcast |
| import org.apache.spark.internal.{config, Logging} |
| import org.apache.spark.rdd.RDD |
| |
| /** |
| * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner |
| * specified in the ShuffleDependency). |
| * |
| * See [[org.apache.spark.scheduler.Task]] for more information. |
| * |
| * @param stageId id of the stage this task belongs to |
| * @param stageAttemptId attempt id of the stage this task belongs to |
| * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, |
| * the type should be (RDD[_], ShuffleDependency[_, _, _]). |
| * @param partition partition of the RDD this task is associated with |
| * @param numPartitions Total number of partitions in the stage that this task belongs to. |
| * @param locs preferred task execution locations for locality scheduling |
| * @param artifacts list of artifacts (may be session-specific) of the job this task belongs to. |
| * @param localProperties copy of thread-local properties set by the user on the driver side. |
| * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side |
| * and sent to executor side. |
| * |
| * The parameters below are optional: |
| * @param jobId id of the job this task belongs to |
| * @param appId id of the app this task belongs to |
| * @param appAttemptId attempt id of the app this task belongs to |
| * @param isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks |
| * at the same time for a barrier stage. |
| */ |
| private[spark] class ShuffleMapTask( |
| stageId: Int, |
| stageAttemptId: Int, |
| taskBinary: Broadcast[Array[Byte]], |
| partition: Partition, |
| numPartitions: Int, |
| @transient private var locs: Seq[TaskLocation], |
| artifacts: JobArtifactSet, |
| localProperties: Properties, |
| serializedTaskMetrics: Array[Byte], |
| jobId: Option[Int] = None, |
| appId: Option[String] = None, |
| appAttemptId: Option[String] = None, |
| isBarrier: Boolean = false) |
| extends Task[MapStatus](stageId, stageAttemptId, partition.index, numPartitions, artifacts, |
| localProperties, serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) |
| with Logging { |
| |
| /** A constructor used only in test suites. This does not require passing in an RDD. */ |
| def this(partitionId: Int) = { |
| this(0, 0, null, new Partition { override def index: Int = 0 }, 1, null, null, new Properties, |
| null) |
| } |
| |
| @transient private val preferredLocs: Seq[TaskLocation] = { |
| if (locs == null) Nil else locs.distinct |
| } |
| |
| override def runTask(context: TaskContext): MapStatus = { |
| // Deserialize the RDD using the broadcast variable. |
| val threadMXBean = ManagementFactory.getThreadMXBean |
| val deserializeStartTimeNs = System.nanoTime() |
| val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { |
| threadMXBean.getCurrentThreadCpuTime |
| } else 0L |
| val ser = SparkEnv.get.closureSerializer.newInstance() |
| val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( |
| ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) |
| _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs |
| _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { |
| threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime |
| } else 0L |
| |
| val rdd = rddAndDep._1 |
| val dep = rddAndDep._2 |
| // While we use the old shuffle fetch protocol, we use partitionId as mapId in the |
| // ShuffleBlockId construction. |
| val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { |
| partitionId |
| } else { |
| context.taskAttemptId() |
| } |
| dep.shuffleWriterProcessor.write( |
| rdd.iterator(partition, context), |
| dep, |
| mapId, |
| partitionId, |
| context) |
| } |
| |
| override def preferredLocations: Seq[TaskLocation] = preferredLocs |
| |
| override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) |
| } |