blob: ea6629117d66552031a866551911b701052bceeb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.util;
import java.io.Serializable;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
/**
* An extension of the {@code Tool} interface that creates a {@code Pipeline}
* instance and provides methods for working with the Pipeline from inside of
* the Tool's run method.
*
*/
public abstract class CrunchTool extends Configured implements Tool, Serializable {
protected static final From from = new From();
protected static final To to = new To();
protected static final At at = new At();
// Pipeline object itself isn't necessarily serializable.
private transient Pipeline pipeline;
public CrunchTool() {
this(false);
}
public CrunchTool(boolean inMemory) {
this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null && pipeline != null) {
pipeline.setConfiguration(conf);
}
}
@Override
public Configuration getConf() {
return pipeline.getConfiguration();
}
public void enableDebug() {
pipeline.enableDebug();
}
public <T> PCollection<T> read(Source<T> source) {
return pipeline.read(source);
}
public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
return pipeline.read(tableSource);
}
public PCollection<String> readTextFile(String pathName) {
return pipeline.readTextFile(pathName);
}
public void write(PCollection<?> pcollection, Target target) {
pipeline.write(pcollection, target);
}
public void writeTextFile(PCollection<?> pcollection, String pathName) {
pipeline.writeTextFile(pcollection, pathName);
}
public <T> Iterable<T> materialize(PCollection<T> pcollection) {
return pipeline.materialize(pcollection);
}
public PipelineResult run() {
return pipeline.run();
}
public PipelineExecution runAsync() {
return pipeline.runAsync();
}
public PipelineResult done() {
return pipeline.done();
}
protected Pipeline getPipeline() {
return pipeline;
}
}