blob: 578f042e8619da699862163dcd8f7c586081461b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch;
import java.io.Serializable;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import com.google.common.base.Preconditions;
/**
* Base class for all data processing functions in Crunch.
*
* <p>
* Note that all {@code DoFn} instances implement {@link Serializable}, and thus
* all of their non-transient member variables must implement
* {@code Serializable} as well. If your DoFn depends on non-serializable
* classes for data processing, they may be declared as {@code transient} and
* initialized in the DoFn's {@code initialize} method.
*
*/
public abstract class DoFn<S, T> implements Serializable {
/** This will be null prior to being set in {@link #setContext(TaskInputOutputContext)}. */
@CheckForNull
private transient TaskInputOutputContext<?, ?, ?, ?> context;
/** This will be null prior to being set in {@link #setConfiguration(Configuration)}. */
@CheckForNull
private transient Configuration conf;
/**
* Configure this DoFn. Subclasses may override this method to modify the
* configuration of the Job that this DoFn instance belongs to.
*
* <p>
* Called during the job planning phase by the crunch-client.
* </p>
*
* @param conf
* The Configuration instance for the Job.
*/
public void configure(Configuration conf) {
}
/**
* Initialize this DoFn. This initialization will happen before the actual
* {@link #process(Object, Emitter)} is triggered. Subclasses may override
* this method to do appropriate initialization.
*
* <p>
* Called during the setup of the job instance this {@code DoFn} is associated
* with.
* </p>
*
*/
public void initialize() {
}
/**
* Processes the records from a {@link PCollection}.
*
* <br/>
* <br/>
* <b>Note:</b> Crunch can reuse a single input record object whose content
* changes on each {@link #process(Object, Emitter)} method call. This
* functionality is imposed by Hadoop's <a href=
* "http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html"
* >Reducer</a> implementation: <i>The framework will reuse the key and value
* objects that are passed into the reduce, therefore the application should
* clone the objects they want to keep a copy of.</i>
*
* @param input
* The input record.
* @param emitter
* The emitter to send the output to
*/
public abstract void process(S input, Emitter<T> emitter);
/**
* Called during the cleanup of the MapReduce job this {@code DoFn} is
* associated with. Subclasses may override this method to do appropriate
* cleanup.
*
* @param emitter
* The emitter that was used for output
*/
public void cleanup(Emitter<T> emitter) {
}
/**
* Called during setup to pass the {@link TaskInputOutputContext} to this
* {@code DoFn} instance. The specified {@code TaskInputOutputContext} must not be null.
*/
public void setContext(@Nonnull TaskInputOutputContext<?, ?, ?, ?> context) {
Preconditions.checkNotNull(context);
this.context = context;
}
/**
* Called during the setup of an initialized {@link org.apache.crunch.types.PType} that
* relies on this instance.
*
* @param conf
* The non-null configuration for the {@code PType} being initialized
*/
public void setConfiguration(@Nonnull Configuration conf) {
Preconditions.checkNotNull(conf);
this.conf = conf;
}
/**
* Returns an estimate of how applying this function to a {@link PCollection}
* will cause it to change in side. The optimizer uses these estimates to
* decide where to break up dependent MR jobs into separate Map and Reduce
* phases in order to minimize I/O.
*
* <p>
* Subclasses of {@code DoFn} that will substantially alter the size of the
* resulting {@code PCollection} should override this method.
*/
public float scaleFactor() {
return 0.99f;
}
/**
* By default, Crunch will do a defensive deep copy of the outputs of a
* DoFn when there are multiple downstream consumers of that item, in order to
* prevent the downstream functions from making concurrent modifications to
* data objects. This introduces some extra overhead in cases where you know
* that the downstream code is only reading the objects and not modifying it,
* so you can disable this feature by overriding this function to
* return {@code true}.
*/
public boolean disableDeepCopy() {
return false;
}
protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
return context;
}
protected Configuration getConfiguration() {
if (conf != null) {
return conf;
} else if (context != null) {
return context.getConfiguration();
} else {
return null;
}
}
/**
* @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2
* (from a class to an interface) so user programs should avoid this method and use
* one of the <code>increment</code> methods instead, such as {@link #increment(Enum)}.
*/
@Deprecated
protected Counter getCounter(Enum<?> counterName) {
if (context == null) {
return null;
}
return context.getCounter(counterName);
}
/**
* @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2
* (from a class to an interface) so user programs should avoid this method and use
* one of the <code>increment</code> methods instead, such as {@link #increment(Enum)}.
*/
@Deprecated
protected Counter getCounter(String groupName, String counterName) {
if (context == null) {
return null;
}
return context.getCounter(groupName, counterName);
}
protected void increment(String groupName, String counterName) {
increment(groupName, counterName, 1);
}
protected void increment(String groupName, String counterName, long value) {
if (context != null) {
context.getCounter(groupName, counterName).increment(value);
}
}
protected void increment(Enum<?> counterName) {
increment(counterName, 1);
}
protected void increment(Enum<?> counterName, long value) {
if (context != null) {
context.getCounter(counterName).increment(value);
}
}
protected void progress() {
if (context != null) {
context.progress();
}
}
protected TaskAttemptID getTaskAttemptID() {
if (context == null) {
return null;
}
return context.getTaskAttemptID();
}
protected void setStatus(String status) {
if (context != null) {
context.setStatus(status);
}
}
protected String getStatus() {
if (context == null) {
return null;
}
return context.getStatus();
}
}