blob: 97fec0d6a994f21a94734befde879854fd542a51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.io.IOException;
import java.util.List;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.PlanValidationException;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.LogicalPlanBuilder;
/**
* Logical plan visitor which handles scalar projections. It will find or create a LOStore
* and a soft link between the store operator to a scalar expression. It will also sync the file name
* between the store and scalar expression.
*/
public class ScalarVisitor extends AllExpressionVisitor {
private final PigContext pigContext;
private final String scope;
public ScalarVisitor(OperatorPlan plan, PigContext pigContext, String scope) throws FrontendException {
super( plan, new DependencyOrderWalker( plan ) );
this.pigContext = pigContext;
this.scope = scope;
}
@Override
protected LogicalExpressionVisitor getVisitor(final LogicalExpressionPlan exprPlan)
throws FrontendException {
return new LogicalExpressionVisitor( exprPlan, new DependencyOrderWalker( exprPlan ) ) {
@Override
public void visit(ScalarExpression expr) throws FrontendException {
// This is a scalar udf.
ConstantExpression filenameConst = (ConstantExpression)exprPlan.getSuccessors( expr ).get( 1 );
Operator refOp = expr.getImplicitReferencedOperator();
Operator attachedOp = expr.getAttachedLogicalOperator();
LogicalPlan lp = (LogicalPlan) attachedOp.getPlan();
List<Operator> succs = lp.getSuccessors( refOp );
LOStore store = null;
FuncSpec interStorageFuncSpec = new FuncSpec(InterStorage.class.getName());
if( succs != null ) {
for( Operator succ : succs ) {
if( succ instanceof LOStore
&& ((LOStore)succ).isTmpStore()
&& interStorageFuncSpec.equals(
((LOStore)succ).getOutputSpec().getFuncSpec() ) ) {
store = (LOStore)succ;
break;
}
}
}
if( store == null ) {
FileSpec fileSpec;
try {
fileSpec = new FileSpec( FileLocalizer.getTemporaryPath( pigContext ).toString(), interStorageFuncSpec ); // TODO: need to hookup the pigcontext.
} catch (IOException e) {
throw new PlanValidationException( expr, "Failed to process scalar" + e);
}
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
String sig = LogicalPlanBuilder.newOperatorKey(scope);
stoFunc.setStoreFuncUDFContextSignature(sig);
boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
store.setTmpStore(true);
lp.add( store );
lp.connect( refOp, store );
}
expr.setImplicitReferencedOperator(store);
filenameConst.setValue( store.getOutputSpec().getFileName() );
if( lp.getSoftLinkSuccessors( store ) == null ||
!lp.getSoftLinkSuccessors( store ).contains( attachedOp ) ) {
lp.createSoftLink( store, attachedOp );
}
}
};
}
}