blob: d0d96e8edadbbdfa1495b09bf5fd7c671c46867b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
/**
* The load operator which is used in two ways:
* 1) As a local operator it can be used to load files
* 2) In the Map Reduce setting, it is used to create jobs
* from MapReduce operators which keep the loads and
* stores in the Map and Reduce Plans till the job is created
*
*/
public class POLoad extends PhysicalOperator {
private static final Log log = LogFactory.getLog(POLoad.class);
/**
*
*/
private static final long serialVersionUID = 1L;
// The user defined load function or a default load function
private transient LoadFunc loader = null;
private transient LoadFuncDecorator lDecorator;
// The filespec on which the operator is based
FileSpec lFile;
// PigContext passed to us by the operator creator
PigContext pc;
//Indicates whether the loader setup is done or not
boolean setUpDone = false;
// Alias for the POLoad
private String signature;
// flag to distinguish user loads from MRCompiler loads.
private boolean isTmpLoad;
private long limit=-1;
private transient List<String> cacheFiles = null;
private transient List<String> shipFiles = null;
public POLoad(OperatorKey k) {
this(k,-1, null);
}
public POLoad(OperatorKey k, FileSpec lFile){
this(k,-1,lFile);
}
public POLoad(OperatorKey k, int rp, FileSpec lFile) {
super(k, rp);
this.lFile = lFile;
}
public POLoad(OperatorKey k, LoadFunc lf){
this(k);
this.loader = lf;
}
/**
* Set up the loader by
* 1) Instantiating the load func
* 2) Opening an input stream to the specified file and
* 3) Binding to the input stream at the specified offset.
* @throws IOException
*/
public void setUp() throws IOException{
loader = new ReadToEndLoader((LoadFunc)
PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()),
ConfigurationUtil.toConfiguration(pc.getProperties()),
lFile.getFileName(),0, signature);
setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
}
/**
* At the end of processing, the inputstream is closed
* using this method
* @throws IOException
*/
public void tearDown() throws IOException{
setUpDone = false;
}
/**
* The main method used by this operator's successor
* to read tuples from the specified file using the
* specified load function.
*
* @return Whatever the loader returns
* A null from the loader is indicative
* of EOP and hence the tearDown of connection
*/
@Override
public Result getNextTuple() throws ExecException {
if(!setUpDone && lFile!=null){
try {
setUp();
} catch (IOException ioe) {
int errCode = 2081;
String msg = "Unable to setup the load function.";
throw new ExecException(msg, errCode, PigException.BUG, ioe);
}
setUpDone = true;
}
Result res = new Result();
try {
res.result = lDecorator.getNext();
if(res.result==null && !lDecorator.getErrorHandling()){
res.returnStatus = POStatus.STATUS_EOP;
tearDown();
}
else
res.returnStatus = POStatus.STATUS_OK;
if (res.returnStatus == POStatus.STATUS_OK)
res.result = illustratorMarkup(res, res.result, 0);
} catch (IOException e) {
log.error("Received error from loader function: " + e);
return res;
}
return res;
}
@Override
public String name() {
return (lFile != null) ? getAliasString() + "Load" + "(" + lFile.toString()
+ ")" + " - " + mKey.toString() : getAliasString() + "Load" + "("
+ "DummyFil:DummyLdr" + ")" + " - " + mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitLoad(this);
}
public FileSpec getLFile() {
return lFile;
}
public void setLFile(FileSpec file) {
lFile = file;
}
public void setIsTmpLoad(boolean tmp) {
isTmpLoad = tmp;
}
public boolean isTmpLoad() {
return isTmpLoad;
}
public PigContext getPc() {
return pc;
}
public void setPc(PigContext pc) {
this.pc = pc;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
public LoadFunc getLoadFunc(){
if (this.loader==null) {
this.loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
this.loader.setUDFContextSignature(signature);
}
if (lDecorator == null) {
setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
}
return this.loader;
}
void setLoadFuncDecorator(LoadFuncDecorator lDecorator) {
this.lDecorator = lDecorator;
}
public LoadFuncDecorator getLoadFuncDecorator() {
return lDecorator;
}
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
if (!illustrator.ceilingCheck()) {
((Result) in).returnStatus = POStatus.STATUS_EOP;
return null;
}
if (illustrator.getSchema() == null || illustrator.getSchema().size() <= ((Tuple) out).size()) {
boolean hasNull = false;
for (int i = 0; i < ((Tuple) out).size(); ++i)
try {
if (((Tuple) out).get(i) == null)
{
hasNull = true;
break;
}
} catch (ExecException e) {
hasNull = true;
break;
}
if (!hasNull) {
ExampleTuple tOut = new ExampleTuple((Tuple) out);
illustrator.getLineage().insert(tOut);
illustrator.addData((Tuple) tOut);
illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
return tOut;
} else
return (Tuple) out;
} else
return (Tuple) out;
} else
return (Tuple) out;
}
public long getLimit() {
return limit;
}
public void setLimit(long limit) {
this.limit = limit;
}
public List<String> getCacheFiles() {
return cacheFiles;
}
public void setCacheFiles(List<String> cf) {
cacheFiles = cf;
}
public List<String> getShipFiles() {
return shipFiles;
}
public void setShipFiles(List<String> sf) {
shipFiles = sf;
}
}