blob: 9ca65299086a358dc635740e09ac2eae2bcab7f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.relational;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.logical.Util;
public class LOLoad extends LogicalRelationalOperator {
private LogicalSchema scriptSchema;
private final FileSpec fs;
private transient LoadFunc loadFunc;
transient private Configuration conf;
private final LogicalSchema determinedSchema;
private List<Integer> requiredFields = null;
private boolean castInserted = false;
private LogicalSchema uidOnlySchema;
private final String schemaFile;
private final String signature;
private long limit = -1;
/**
* used for pattern matching
*
* @param schema schema user specified in script, or null if not
* specified.
* @param plan logical plan this load is part of.
*/
public LOLoad(LogicalSchema schema, LogicalPlan plan) {
this(null, schema, plan, null, null, null);
}
/**
* Used from the LogicalPlanBuilder
*
* @param loader FuncSpec for load function to use for this load.
* @param schema schema user specified in script, or null if not specified.
* @param plan logical plan this load is part of.
* @param conf
* @param loadFunc the LoadFunc that was instantiated from loader
* @param signature the signature that will be passed to the LoadFunc
*/
public LOLoad(FileSpec loader, LogicalSchema schema, LogicalPlan plan, Configuration conf, LoadFunc loadFunc, String signature) {
super("LOLoad", plan);
this.scriptSchema = schema;
this.fs = loader;
this.schemaFile = loader == null ? null : loader.getFileName();
this.conf = conf;
this.loadFunc = loadFunc;
this.signature = signature;
storeScriptSchema(conf, scriptSchema, signature);
if (loadFunc != null) {
this.loadFunc.setUDFContextSignature(signature);
try {
this.determinedSchema = getSchemaFromMetaData();
} catch (FrontendException e) {
throw new RuntimeException("Can not retrieve schema from loader " + loadFunc, e);
}
} else {
this.determinedSchema = null;
}
}
public String getSchemaFile() {
return schemaFile;
}
public LoadFunc getLoadFunc() throws FrontendException {
try {
if (loadFunc == null && fs!=null) {
loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
loadFunc.setUDFContextSignature(signature);
}
return loadFunc;
}catch (ClassCastException cce) {
throw new FrontendException(this, fs.getFuncSpec() + " should implement the LoadFunc interface.", 2236);
}
}
public void setScriptSchema(LogicalSchema schema) {
scriptSchema = schema;
}
public void setRequiredFields(List<Integer> requiredFields) {
this.requiredFields = requiredFields;
}
/**
* Get the schema for this load. The schema will be either be what was
* given by the user in the script or what the load functions getSchema
* call returned. Otherwise null will be returned, indicating that the
* schema is unknown.
* @return schema, or null if unknown
*/
@Override
public LogicalSchema getSchema() throws FrontendException {
if (schema != null)
return schema;
LogicalSchema originalSchema = null;
if (scriptSchema != null && determinedSchema != null) {
originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema, LogicalSchema.MergeMode.LoadForEach);
} else if (scriptSchema != null) originalSchema = scriptSchema;
else if (determinedSchema != null) originalSchema = determinedSchema;
if (isCastInserted()) {
for (int i=0;i<originalSchema.size();i++) {
LogicalSchema.LogicalFieldSchema fs = originalSchema.getField(i);
if(determinedSchema == null) {
// Reset the loads field schema to byte array so that it
// will reflect reality.
fs.type = DataType.BYTEARRAY;
} else {
// Reset the type to what determinedSchema says it is
fs.type = determinedSchema.getField(i).type;
}
}
}
if (originalSchema!=null) {
uidOnlySchema = originalSchema.mergeUid(uidOnlySchema);
}
if (requiredFields!=null) {
schema = new LogicalSchema();
for (int i=0;i<originalSchema.size();i++) {
if (requiredFields.contains(i))
schema.addField(originalSchema.getField(i));
}
} else
schema = originalSchema;
return schema;
}
private LogicalSchema getSchemaFromMetaData() throws FrontendException {
if (getLoadFunc()!=null && getLoadFunc() instanceof LoadMetadata) {
try {
ResourceSchema resourceSchema = ((LoadMetadata)loadFunc).getSchema(getFileSpec().getFileName(), new Job(conf));
Schema oldSchema = Schema.getPigSchema(resourceSchema);
return Util.translateSchema(oldSchema);
} catch (IOException e) {
throw new FrontendException( this, "Cannot get schema from loadFunc " + loadFunc.getClass().getName(), 2245, e);
}
}
return null;
}
/**
* This method will store the scriptSchema:Schema using ObjectSerializer to
* the current configuration.<br/>
* The schema can be retrieved by load functions or UDFs to know the schema
* the user entered in the as clause.<br/>
* The name format is:<br/>
*
* <pre>
* ${UDFSignature}.scriptSchema = ObjectSerializer.serialize(scriptSchema)
* </pre>
* <p/>
* Note that this is not the schema the load function returns but will
* always be the as clause schema.<br/>
* That is a = LOAD 'input' as (a:chararray, b:chararray)<br/>
* The schema wil lbe (a:chararray, b:chararray)<br/>
* <p/>
*
* TODO Find better solution to make script schema available to LoadFunc see
* https://issues.apache.org/jira/browse/PIG-1717
*/
private void storeScriptSchema(Configuration conf, LogicalSchema scriptSchema, String signature) {
if (conf != null && scriptSchema != null && signature != null) {
try {
conf.set(
Utils.getScriptSchemaKey(signature),
ObjectSerializer.serialize(Util.translateSchema(scriptSchema)));
} catch (IOException ioe) {
int errCode = 1018;
String msg = "Problem serializing script schema";
FrontendException fee = new FrontendException(this, msg, errCode,
PigException.INPUT, false, null, ioe);
throw new RuntimeException(fee);
}
}
}
public FileSpec getFileSpec() {
return fs;
}
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
throw new FrontendException("Expected LogicalPlanVisitor", 2223);
}
((LogicalRelationalNodesVisitor)v).visit(this);
}
public LogicalSchema getDeterminedSchema() {
return determinedSchema;
}
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof LOLoad) {
LOLoad ol = (LOLoad)other;
if (!checkEquality(ol)) return false;
if (fs == null) {
if (ol.fs == null) {
return true;
}else{
return false;
}
}
return fs.equals(ol.fs);
} else {
return false;
}
}
public void setCastInserted(boolean flag) {
castInserted = flag;
}
public boolean isCastInserted() {
return castInserted;
}
public Configuration getConfiguration() {
return conf;
}
@Override
public void resetUid() {
uidOnlySchema = null;
}
@Override
public String toString(){
String str = super.toString();
return (str + "RequiredFields:" + requiredFields);
}
public String getSignature() {
return signature;
}
public LogicalSchema getScriptSchema() {
return scriptSchema;
}
public long getLimit() {
return limit;
}
public void setLimit(long limit) {
this.limit = limit;
}
}