CRUNCH-446: null checks inside of DoFns. Contributed by Allan Shoup.
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index dd2403f..89b5c11 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -65,6 +65,12 @@
</dependency>
<dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
index c2ed35d..578f042 100644
--- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -19,11 +19,16 @@
import java.io.Serializable;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import com.google.common.base.Preconditions;
+
/**
* Base class for all data processing functions in Crunch.
*
@@ -36,7 +41,11 @@
*
*/
public abstract class DoFn<S, T> implements Serializable {
+ /** This will be null prior to being set in {@link #setContext(TaskInputOutputContext)}. */
+ @CheckForNull
private transient TaskInputOutputContext<?, ?, ?, ?> context;
+ /** This will be null prior to being set in {@link #setConfiguration(Configuration)}. */
+ @CheckForNull
private transient Configuration conf;
/**
@@ -100,9 +109,10 @@
/**
* Called during setup to pass the {@link TaskInputOutputContext} to this
- * {@code DoFn} instance.
+ * {@code DoFn} instance. The specified {@code TaskInputOutputContext} must not be null.
*/
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ public void setContext(@Nonnull TaskInputOutputContext<?, ?, ?, ?> context) {
+ Preconditions.checkNotNull(context);
this.context = context;
}
@@ -110,9 +120,11 @@
* Called during the setup of an initialized {@link org.apache.crunch.types.PType} that
* relies on this instance.
*
- * @param conf The configuration for the {@code PType} being initialized
+ * @param conf
+ * The non-null configuration for the {@code PType} being initialized
*/
- public void setConfiguration(Configuration conf) {
+ public void setConfiguration(@Nonnull Configuration conf) {
+ Preconditions.checkNotNull(conf);
this.conf = conf;
}
@@ -164,6 +176,9 @@
*/
@Deprecated
protected Counter getCounter(Enum<?> counterName) {
+ if (context == null) {
+ return null;
+ }
return context.getCounter(counterName);
}
@@ -174,6 +189,9 @@
*/
@Deprecated
protected Counter getCounter(String groupName, String counterName) {
+ if (context == null) {
+ return null;
+ }
return context.getCounter(groupName, counterName);
}
@@ -182,7 +200,9 @@
}
protected void increment(String groupName, String counterName, long value) {
- context.getCounter(groupName, counterName).increment(value);
+ if (context != null) {
+ context.getCounter(groupName, counterName).increment(value);
+ }
}
protected void increment(Enum<?> counterName) {
@@ -190,22 +210,34 @@
}
protected void increment(Enum<?> counterName, long value) {
- context.getCounter(counterName).increment(value);
+ if (context != null) {
+ context.getCounter(counterName).increment(value);
+ }
}
protected void progress() {
- context.progress();
+ if (context != null) {
+ context.progress();
+ }
}
protected TaskAttemptID getTaskAttemptID() {
+ if (context == null) {
+ return null;
+ }
return context.getTaskAttemptID();
}
protected void setStatus(String status) {
- context.setStatus(status);
+ if (context != null) {
+ context.setStatus(status);
+ }
}
protected String getStatus() {
+ if (context == null) {
+ return null;
+ }
return context.getStatus();
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index d6065f9..3ba4dfb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -580,7 +580,7 @@
@Override
public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
for (MapFn fn : fns) {
- fn.setContext(getContext());
+ fn.setContext(context);
}
}
@@ -746,7 +746,7 @@
@Override
public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
for (MapFn fn : fns) {
- fn.setContext(getContext());
+ fn.setContext(context);
}
}
diff --git a/pom.xml b/pom.xml
index 8683080..8250712 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@
<scala.version>2.10.4</scala.version>
<scalatest.version>1.9.1</scalatest.version>
<spark.version>0.9.1</spark.version>
+ <jsr305.version>1.3.9</jsr305.version>
</properties>
<scm>
@@ -393,6 +394,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${jsr305.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>