blob: f6837a1b200dbdbaea89e859372940220d8cd4c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.spark.Partitioner;
/**
* Spark Partitioner that wraps a custom partitioner that implements
* org.apache.hadoop.mapreduce.Partitioner interface.
*
* Since Spark's shuffle API takes a different parititioner class
* (@see org.apache.spark.Partitioner) compared to MapReduce, we need to
* wrap custom partitioners written for MapReduce inside this Spark Partitioner.
*
* MR Custom partitioners are expected to implement getPartition() with
* specific arguments:
* public int getPartition(PigNullableWritable key, Writable value, int numPartitions)
* For an example of such a partitioner,
* @see org.apache.pig.test.utils.SimpleCustomPartitioner
*/
public class MapReducePartitionerWrapper extends Partitioner {
private static final Log LOG = LogFactory.getLog(MapReducePartitionerWrapper.class);
private int numPartitions;
private String partitionerName;
// MR's Partitioner interface is not serializable.
// And since it is not serializable, it cannot be initialized in the constructor
// (in Spark's DAG scheduler thread in Spark Driver),
// To workaround this, It will be lazily initialized inside the map task
// (Executor thread) first time that getPartitions() gets called.
transient private org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>
mapredPartitioner = null;
transient private Method getPartitionMethod = null;
public MapReducePartitionerWrapper(String partitionerName,
int numPartitions) {
if (partitionerName == null) {
throw new RuntimeException("MapReduce Partitioner cannot be null.");
}
this.partitionerName = partitionerName;
this.numPartitions = numPartitions;
}
public int numPartitions() {
return numPartitions;
}
public int getPartition(final Object key) {
try {
PigNullableWritable writeableKey = new PigNullableWritable() {
public Object getValueAsPigType() {
if (key instanceof IndexedKey) {
IndexedKey indexedKey = (IndexedKey) key;
this.setIndex(indexedKey.getIndex());
return indexedKey.getKey();
} else {
return key;
}
}
};
// Lazy initialization
// Synchronized because multiple (map) tasks in the same Spark Executor
// may call getPartition, attempting to initialize at the same time.
if (mapredPartitioner == null) {
synchronized (this) {
// check again for race condition
if (mapredPartitioner == null) {
Class<?> mapredPartitionerClass =
PigContext.resolveClassName(partitionerName);
Configuration conf = new Configuration();
mapredPartitioner = (org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>)
ReflectionUtils.newInstance(mapredPartitionerClass, conf);
getPartitionMethod = mapredPartitionerClass.getMethod(
"getPartition",
PigNullableWritable.class,
org.apache.hadoop.io.Writable.class,
int.class);
}
}
}
// MR Parititioner getPartition takes a value argument as well, but
// Spark's Partitioner only accepts the key argument.
// In practice, MR Partitioners ignore the value. However, it's
// possible that some don't.
// TODO: We could handle this case by packaging the value inside the
// key (conditioned on some config option, since this will balloon
// memory usage). PIG-4575.
int partition = (Integer) getPartitionMethod.invoke(mapredPartitioner,
writeableKey, null, numPartitions);
return partition;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}