blob: 232d0a1b0cc72bac7a7fc0649ec5a65367e8919e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.materialize;
import java.io.IOException;
import java.util.Iterator;
import com.google.common.collect.Iterators;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.impl.FileSourceImpl;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A reference to the materialized output of a {@code PCollection} created
* by a subclass of {@code DistributedPipeline}.
*/
public class MaterializableIterable<E> implements Iterable<E> {
private static final Logger LOG = LoggerFactory.getLogger(MaterializableIterable.class);
private final Pipeline pipeline;
private final ReadableSource<E> source;
private Iterable<E> materialized;
private PipelineResult result;
public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
this.pipeline = pipeline;
this.source = source;
this.materialized = null;
}
/**
* Returns the backing {@code ReadableSource} for this instance.
*/
public ReadableSource<E> getSource() {
return source;
}
/**
* Indicates whether this instance is backed by a {@code SourceTarget}.
*/
public boolean isSourceTarget() {
return (source instanceof SourceTarget);
}
/**
* Returns the {@code Path} that contains this data, or null if no such path exists.
*/
public Path getPath() {
if (source instanceof FileSourceImpl) {
return ((FileSourceImpl) source).getPath();
}
if (source instanceof PathTarget) {
return ((PathTarget) source).getPath();
}
return null;
}
/**
* Returns the {@code PipelineResult} that was generated by the Pipeline execution that
* created this data. This result will only be non-empty if an actual pipeline execution was
* performed in order to generate this data, and it will only be non-null if this method is
* called after the data from this Iterable is retrieved.
*/
public PipelineResult getPipelineResult() {
return result;
}
@Override
public Iterator<E> iterator() {
if (materialized == null) {
this.result = pipeline.run();
if (result.succeeded() || !pipeline.getConfiguration().getBoolean("crunch.empty.materialize.on.failure", false)) {
materialize();
} else {
LOG.error("Pipeline run failed, returning empty iterator");
return Iterators.emptyIterator();
}
}
return materialized.iterator();
}
public void materialize() {
try {
materialized = source.read(pipeline.getConfiguration());
} catch (IOException e) {
LOG.error("Could not materialize: {}", source, e);
throw new CrunchRuntimeException(e);
}
}
}