blob: f8eab63623925f8e652628001e67af5a9ed79406 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.HJob;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
import org.apache.pig.impl.io.compress.BZip2CodecWithExtensionBZ;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalPlanData;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.parser.QueryParserUtils;
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.validator.BlackAndWhitelistFilter;
import org.apache.pig.validator.PigCommandFilter;
import com.google.common.annotations.VisibleForTesting;
/**
*
* A class for Java programs to connect to Pig. Typically a program will create a PigServer
* instance. The programmer then registers queries using registerQuery() and
* retrieves results using openIterator() or store(). After doing so, the
* shutdown() method should be called to free any resources used by the current
* PigServer instance. Not doing so could result in a memory leak.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class PigServer {
protected final Log log = LogFactory.getLog(getClass());
public static final String PRETTY_PRINT_SCHEMA_PROPERTY = "pig.pretty.print.schema";
private static final String PIG_LOCATION_CHECK_STRICT = "pig.location.check.strict";
/*
* The data structure to support grunt shell operations.
* The grunt shell can only work on one graph at a time.
* If a script is contained inside another script, the grunt
* shell first saves the current graph on the stack and works
* on a new graph. After the nested script is done, the grunt
* shell pops up the saved graph and continues working on it.
*/
protected final Deque<Graph> graphs = new LinkedList<Graph>();
/*
* The current Graph the grunt shell is working on.
*/
private Graph currDAG;
protected final PigContext pigContext;
private String jobName;
private String jobPriority;
private final static AtomicInteger scopeCounter = new AtomicInteger(0);
protected final String scope = constructScope();
private boolean validateEachStatement = false;
private boolean skipParseInRegisterForBatch = false;
private final BlackAndWhitelistFilter filter;
private String constructScope() {
// scope servers for now as a session id
// String user = System.getProperty("user.name", "DEFAULT_USER_ID");
// String date = (new Date()).toString();
// scope is not really used in the system right now. It will
// however make your explain statements look lengthy if set to
// username-date. For now let's simplify the scope, if a real
// scope is needed again, we might need to update all the
// operators to not include scope in their name().
return "" + scopeCounter.incrementAndGet();
}
@VisibleForTesting
public static void resetScope() {
scopeCounter.set(0);
}
/**
* @param execTypeString can be 'mapreduce' or 'local'. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
* Mapreduce mode will connect to a cluster to execute the job. If
* execTypeString is not one of these two, Pig will deduce the ExecutionEngine
* if it is on the classpath and use it for the backend execution.
* @throws ExecException
* @throws IOException
*/
public PigServer(String execTypeString) throws ExecException, IOException {
this(addExecTypeProperty(PropertiesUtil.loadDefaultProperties(), execTypeString));
}
public PigServer(String execTypeString, Properties properties) throws ExecException, IOException {
this(addExecTypeProperty(properties, execTypeString));
}
public PigServer(Properties properties) throws ExecException, IOException {
this(new PigContext(properties));
}
private static Properties addExecTypeProperty(Properties properties, String execType) {
properties.setProperty("exectype", execType);
return properties;
}
/**
* @param execType execution type to start the engine. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
* Mapreduce mode will connect to a cluster to execute the job.
* @throws ExecException
*/
public PigServer(ExecType execType) throws ExecException {
this(execType, PropertiesUtil.loadDefaultProperties());
}
public PigServer(ExecType execType, Properties properties) throws ExecException {
this(new PigContext(execType, properties));
}
public PigServer(ExecType execType, Configuration conf) throws ExecException {
this(new PigContext(execType, conf));
}
public PigServer(PigContext context) throws ExecException {
this(context, true);
}
public PigServer(PigContext context, boolean connect) throws ExecException {
this.pigContext = context;
currDAG = new Graph(false);
jobName = pigContext.getProperties().getProperty(
PigContext.JOB_NAME,
PigContext.JOB_NAME_PREFIX + ":DefaultJobName");
if (connect) {
pigContext.connect();
}
this.filter = new BlackAndWhitelistFilter(this);
addHadoopProperties();
addJarsFromProperties();
markPredeployedJarsFromProperties();
if (ScriptState.get() == null) {
// If Pig was started via command line, ScriptState should have been
// already initialized in Main. If so, we should not overwrite it.
ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
}
PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
// log ATS event includes the caller context
String auditId = PigATSClient.getPigAuditId(pigContext);
String callerId = (String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID);
log.info("Pig Script ID for the session: " + auditId);
if (callerId != null) {
log.info("Caller ID for session: " + callerId);
}
if (Boolean.parseBoolean(pigContext.getProperties()
.getProperty(PigConfiguration.PIG_ATS_ENABLED))) {
if (Boolean.parseBoolean(pigContext.getProperties()
.getProperty("yarn.timeline-service.enabled", "false"))) {
PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId);
try {
PigATSClient.getInstance().logEvent(event);
} catch (Exception e) {
log.warn("Error posting to ATS: ", e);
}
} else {
log.warn("ATS is disabled since"
+ " yarn.timeline-service.enabled set to false");
}
}
// set hdfs caller context
Class callerContextClass = null;
try {
callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext");
} catch (ClassNotFoundException e) {
// If pre-Hadoop 2.8.0, skip setting CallerContext
}
if (callerContextClass != null) {
try {
// Reflection for the following code since it is only available since hadoop 2.8.0:
// CallerContext hdfsContext = new CallerContext.Builder(auditId).build();
// CallerContext.setCurrent(hdfsContext);
Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class);
Object builder = callerContextBuilderConstruct.newInstance(auditId);
Method builderBuildMethod = builder.getClass().getMethod("build");
Object hdfsContext = builderBuildMethod.invoke(builder);
Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext);
} catch (Exception e) {
// Shall not happen unless API change in future Hadoop commons
throw new ExecException(e);
}
}
}
private void addHadoopProperties() throws ExecException {
// For BZip input on hadoop 0.23/2.X
// with PIG_BZIP_USE_HADOOP_INPUTFORMAT turned on,
// PigTextInputFormat depends on hadoop's TextInputFormat
// for handling bzip2 input. One problem is it only recognize 'bz2'
// as extension and not 'bz'.
// Adding custom BZip2 codec that returns 'bz' as extension
// for backward compatibility.
String codecs =
pigContext.getProperties().getProperty("io.compression.codecs");
if( codecs != null
&& codecs.contains(BZip2Codec.class.getCanonicalName() ) ) {
pigContext.getProperties().setProperty("io.compression.codecs",
codecs + ","
+ BZip2CodecWithExtensionBZ.class.getCanonicalName() );
}
}
private void addJarsFromProperties() throws ExecException {
//add jars from properties to extraJars
String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");
if (jar_str==null) {
jar_str = "";
}
jar_str = jar_str.replaceAll(File.pathSeparator, ",");
if (!jar_str.isEmpty()) {
jar_str += ",";
}
String jar_str_comma = pigContext.getProperties().getProperty("pig.additional.jars.uris");
if (jar_str_comma!=null && !jar_str_comma.isEmpty()) {
jar_str = jar_str + jar_str_comma;
}
if(jar_str != null && !jar_str.isEmpty()){
// Use File.pathSeparator (":" on Linux, ";" on Windows)
// to correctly handle path aggregates as they are represented
// on the Operating System.
for(String jar : jar_str.split(",")){
try {
registerJar(jar);
} catch (IOException e) {
int errCode = 4010;
String msg =
"Failed to register jar :" + jar + ". Caught exception.";
throw new ExecException(
msg,
errCode,
PigException.USER_ENVIRONMENT,
e
);
}
}
}
}
private void markPredeployedJarsFromProperties() throws ExecException {
// mark jars as predeployed from properties
String jar_str = pigContext.getProperties().getProperty("pig.predeployed.jars");
if(jar_str != null){
// Use File.pathSeparator (":" on Linux, ";" on Windows)
// to correctly handle path aggregates as they are represented
// on the Operating System.
for(String jar : jar_str.split(File.pathSeparator)){
if (jar.length() > 0) {
pigContext.markJarAsPredeployed(jar);
}
}
}
}
public PigContext getPigContext(){
return pigContext;
}
/**
* Current DAG
*
* @return
*/
public Graph getCurrentDAG() {
return this.currDAG;
}
/**
* Set the logging level to DEBUG.
*/
public void debugOn() {
Logger.getLogger("org.apache.pig").setLevel(Level.DEBUG);
pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", Level.DEBUG.toString());
}
/**
* Set the logging level to the default.
*/
public void debugOff() {
Logger.getLogger("org.apache.pig").setLevel(pigContext.getDefaultLogLevel());
pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", pigContext.getDefaultLogLevel().toString());
}
/**
* Set the default parallelism for this job
* @param p default number of reducers to use for this job.
*/
public void setDefaultParallel(int p) {
pigContext.defaultParallel = p;
}
/**
* Starts batch execution mode.
*/
public void setBatchOn() {
log.debug("Create a new graph.");
if (currDAG != null) {
graphs.push(currDAG);
}
currDAG = new Graph(true);
}
/**
* Retrieve the current execution mode.
*
* @return true if the execution mode is batch; false otherwise.
*/
public boolean isBatchOn() {
// Batch is on when there are multiple graphs on the
// stack. That gives the right response even if multiquery was
// turned off.
return graphs.size() > 0;
}
/**
* Returns whether there is anything to process in the current batch.
* @throws FrontendException
* @return true if there are no stores to process in the current
* batch, false otherwise.
*/
public boolean isBatchEmpty() throws FrontendException {
if (currDAG == null) {
int errCode = 1083;
String msg = "setBatchOn() must be called first.";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
return currDAG.isBatchEmpty();
}
/**
* This method parses the scripts and builds the LogicalPlan. This method
* should be followed by {@link PigServer#executeBatch(boolean)} with
* argument as false. Do Not use {@link PigServer#executeBatch()} after
* calling this method as that will re-parse and build the script.
*
* @throws IOException
*/
public void parseAndBuild() throws IOException {
if (currDAG == null || !isBatchOn()) {
int errCode = 1083;
String msg = "setBatchOn() must be called first.";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
currDAG.parseQuery();
currDAG.buildPlan( null );
}
/**
* Submits a batch of Pig commands for execution.
*
* @return list of jobs being executed
* @throws IOException
*/
public List<ExecJob> executeBatch() throws IOException {
return executeBatch(true);
}
/**
* Submits a batch of Pig commands for execution. Parse and build of script
* should be skipped if user called {@link PigServer#parseAndBuild()}
* before. Pass false as an argument in which case.
*
* @param parseAndBuild
* @return
* @throws IOException
*/
public List<ExecJob> executeBatch(boolean parseAndBuild) throws IOException {
if (parseAndBuild) {
parseAndBuild();
}
PigStats stats = execute();
return getJobs(stats);
}
/**
* Retrieves a list of Job objects from the PigStats object
* @param stats
* @return A list of ExecJob objects
*/
protected List<ExecJob> getJobs(PigStats stats) {
LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
if (stats instanceof EmptyPigStats) {
HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null)
.getPOStore(), null);
jobs.add(job);
return jobs;
}
JobGraph jGraph = stats.getJobGraph();
Iterator<JobStats> iter = jGraph.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
for (OutputStats output : js.getOutputs()) {
if (js.isSuccessful()) {
jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
.getPOStore(), output.getAlias(), stats));
} else {
HJob hjob = new HJob(HJob.JOB_STATUS.FAILED, pigContext, output
.getPOStore(), output.getAlias(), stats);
hjob.setException(js.getException());
jobs.add(hjob);
}
}
}
return jobs;
}
/**
* Discards a batch of Pig commands.
*
* @throws FrontendException
*/
public void discardBatch() throws FrontendException {
if (currDAG == null || !isBatchOn()) {
int errCode = 1083;
String msg = "setBatchOn() must be called first.";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
currDAG = graphs.pop();
}
/**
* Add a path to be skipped while automatically shipping binaries for
* streaming.
*
* @param path path to be skipped
*/
public void addPathToSkip(String path) {
pigContext.addPathToSkip(path);
}
/**
* Defines an alias for the given function spec. This
* is useful for functions that require arguments to the
* constructor.
*
* @param function - the new function alias to define.
* @param funcSpec - the FuncSpec object representing the name of
* the function class and any arguments to constructor.
*/
public void registerFunction(String function, FuncSpec funcSpec) {
pigContext.registerFunction(function, funcSpec);
}
/**
* Defines an alias for the given streaming command.
*
* @param commandAlias - the new command alias to define
* @param command - streaming command to be executed
*/
public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
pigContext.registerStreamCmd(commandAlias, command);
}
private URL locateJarFromResources(String jarName) throws IOException {
Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
URL resourceLocation = null;
if (urls.hasMoreElements()) {
resourceLocation = urls.nextElement();
}
if (urls.hasMoreElements()) {
StringBuffer sb = new StringBuffer("Found multiple resources that match ");
sb.append(jarName);
sb.append(": ");
sb.append(resourceLocation);
while (urls.hasMoreElements()) {
sb.append(urls.nextElement());
sb.append("; ");
}
log.debug(sb.toString());
}
return resourceLocation;
}
/**
* Registers a jar file. Name of the jar file can be an absolute or
* relative path.
*
* If multiple resources are found with the specified name, the
* first one is registered as returned by getSystemResources.
* A warning is issued to inform the user.
*
* @param name of the jar file to register
* @throws IOException
*/
public void registerJar(String name) throws IOException {
// Check if this operation is permitted
filter.validate(PigCommandFilter.Command.REGISTER);
if (pigContext.hasJar(name)) {
log.debug("Ignoring duplicate registration for jar " + name);
return;
}
// first try to locate jar via system resources
// if this fails, try by using "name" as File (this preserves
// compatibility with case when user passes absolute path or path
// relative to current working directory.)
if (name != null) {
if (name.isEmpty()) {
log.warn("Empty string specified for jar path");
return;
}
URL resource = locateJarFromResources(name);
if (resource == null) {
FetchFileRet[] files = FileLocalizer.fetchFiles(pigContext.getProperties(), name);
for (FetchFileRet file : files) {
File f = file.file;
if (!f.canRead()) {
int errCode = 4002;
String msg = "Can't read jar file: " + name;
throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
}
pigContext.addJar(f.toURI().toURL(), name);
}
} else {
pigContext.addJar(resource, name);
}
}
}
/**
* Universal Scripting Language Support, see PIG-928
*
* @param path path of the script file
* @param scriptingLang language keyword or scriptingEngine used to interpret the script
* @param namespace namespace defined for functions of this script
* @throws IOException
*/
public void registerCode(String path, String scriptingLang, String namespace)
throws IOException {
if (pigContext.scriptingUDFs.containsKey(path) &&
pigContext.scriptingUDFs.get(path).equals(namespace)) {
log.debug("Ignoring duplicate registration for scripting udf file " + path + " in namespace " + namespace);
return;
} else {
pigContext.scriptingUDFs.put(path, namespace);
}
FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), path);
File f = ret.file;
if (!f.canRead()) {
int errCode = 4002;
String msg = "Can't read file: " + path;
throw new FrontendException(msg, errCode,
PigException.USER_ENVIRONMENT);
}
String cwd = new File(".").getCanonicalPath();
String filePath = f.getCanonicalPath();
String nameInJar = filePath;
// Use the relative path in the jar, if the path specified is relative
if (!ret.didFetch) {
if (!new File(path).isAbsolute() && path.indexOf("." + File.separator) == -1) {
// In case of Oozie, the localized files are in a different
// directory symlinked to the current directory. Canonical path will not point to cwd.
nameInJar = path;
} else if (filePath.equals(cwd + File.separator + path)) {
// If user specified absolute path and it refers to cwd
nameInJar = filePath.substring(cwd.length() + 1);
}
}
pigContext.addScriptFile(nameInJar, filePath);
if(scriptingLang != null) {
ScriptEngine se = ScriptEngine.getInstance(scriptingLang);
se.registerFunctions(nameInJar, namespace, pigContext);
}
}
/**
* Register a query with the Pig runtime. The query is parsed and registered, but it is not
* executed until it is needed.
*
* @param query
* a Pig Latin expression to be evaluated.
* @param startLine
* line number of the query within the whole script
* @throws IOException
*/
public void registerQuery(String query, int startLine) throws IOException {
currDAG.registerQuery(query, startLine, validateEachStatement, skipParseInRegisterForBatch);
}
/**
* Register a query with the Pig runtime. The query is parsed and registered, but it is not
* executed until it is needed. Equivalent to calling {@link #registerQuery(String, int)}
* with startLine set to 1.
*
* @param query
* a Pig Latin expression to be evaluated.
* @throws IOException
*/
public void registerQuery(String query) throws IOException {
registerQuery(query, 1);
}
/**
* Register a pig script from InputStream source which is more general and extensible
* the pig script can be from local file, then you can use FileInputStream.
* or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
* even pig script can be in remote machine, which you get wrap it as SocketInputStream
* @param in
* @throws IOException
*/
public void registerScript(InputStream in) throws IOException{
registerScript(in, null, null);
}
/**
* Register a pig script from InputStream source which is more general and extensible
* the pig script can be from local file, then you can use FileInputStream.
* or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
* even pig script can be in remote machine, which you get wrap it as SocketInputStream.
* The parameters in the pig script will be substituted with the values in params
* @param in
* @param params the key is the parameter name, and the value is the parameter value
* @throws IOException
*/
public void registerScript(InputStream in, Map<String,String> params) throws IOException{
registerScript(in, params, null);
}
/**
* Register a pig script from InputStream source which is more general and extensible
* the pig script can be from local file, then you can use FileInputStream.
* or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
* even pig script can be in remote machine, which you get wrap it as SocketInputStream
* The parameters in the pig script will be substituted with the values in the parameter files
* @param in
* @param paramsFiles files which have the parameter setting
* @throws IOException
*/
public void registerScript(InputStream in, List<String> paramsFiles) throws IOException {
registerScript(in, null, paramsFiles);
}
/**
* Register a pig script from InputStream.<br>
* The pig script can be from local file, then you can use FileInputStream.
* Or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
* Pig script can even be in remote machine, which you get wrap it as SocketInputStream.<br>
* The parameters in the pig script will be substituted with the values in the map and the parameter files.
* The values in params Map will override the value in parameter file if they have the same parameter
* @param in
* @param params the key is the parameter name, and the value is the parameter value
* @param paramsFiles files which have the parameter setting
* @throws IOException
*/
public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
try {
String script = IOUtils.toString(in);
ScriptState.get().setScript(script);
String substituted = pigContext.doParamSubstitution(new BufferedReader(new StringReader(script)),
paramMapToList(params), paramsFiles);
GruntParser grunt = new GruntParser(new StringReader(substituted), this);
grunt.setInteractive(false);
grunt.parseStopOnError(true);
} catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
log.error(e.getLocalizedMessage());
throw new IOException(e);
}
}
protected List<String> paramMapToList(Map<String, String> params) {
List<String> paramList = new ArrayList<String>();
if (params != null) {
for (Map.Entry<String, String> entry : params.entrySet()) {
paramList.add(entry.getKey() + "=" + entry.getValue());
}
}
return paramList;
}
/**
* Creates a clone of the current DAG
* @return A Graph object which is a clone of the current DAG
* @throws IOException
*/
protected Graph getClonedGraph() throws IOException {
Graph graph = currDAG.duplicate();
if (graph == null) {
int errCode = 2127;
String msg = "Cloning of plan failed.";
throw new FrontendException(msg, errCode, PigException.BUG);
}
return graph;
}
/**
* Register a query with the Pig runtime. The query will be read from the indicated file.
* @param fileName file to read query from.
* @throws IOException
*/
public void registerScript(String fileName) throws IOException {
registerScript(fileName, null, null);
}
/**
* Register a pig script file. The parameters in the file will be substituted with the values in params
* @param fileName pig script file
* @param params the key is the parameter name, and the value is the parameter value
* @throws IOException
*/
public void registerScript(String fileName, Map<String,String> params) throws IOException {
registerScript(fileName, params, null);
}
/**
* Register a pig script file. The parameters in the file will be substituted with the values in the parameter files
* @param fileName pig script file
* @param paramsFiles files which have the parameter setting
* @throws IOException
*/
public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
registerScript(fileName, null, paramsFiles);
}
/**
* Register a pig script file. The parameters in the file will be substituted with the values in the map and the parameter files
* The values in params Map will override the value in parameter file if they have the same parameter
* @param fileName pig script
* @param params the key is the parameter name, and the value is the parameter value
* @param paramsFiles files which have the parameter setting
* @throws IOException
*/
public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
FileInputStream fis = null;
try{
fis = new FileInputStream(fileName);
ScriptState.get().setFileName(fileName);
registerScript(fis, params, paramsFiles);
}catch (FileNotFoundException e){
log.error(e.getLocalizedMessage());
throw new IOException(e);
} finally {
if (fis != null) {
fis.close();
}
}
}
/**
* Intended to be used by unit tests only.
* Print a list of all aliases in in the current Pig Latin script. Output is written to
* System.out.
* @throws FrontendException
*/
public void printAliases () throws FrontendException {
System.out.println("aliases: " + currDAG.getAliasOp().keySet());
}
/**
* Write the schema for an alias to System.out.
* @param alias Alias whose schema will be written out
* @return Schema of alias dumped
* @throws IOException
*/
public Schema dumpSchema(String alias) throws IOException {
try {
pigContext.inDumpSchema = true;
if ("@".equals(alias)) {
alias = getLastRel();
}
LogicalRelationalOperator op = getOperatorForAlias( alias );
LogicalSchema schema = op.getSchema();
boolean pretty = "true".equals(pigContext.getProperties()
.getProperty(PRETTY_PRINT_SCHEMA_PROPERTY));
if (schema != null) {
Schema s = org.apache.pig.newplan.logical.Util.translateSchema(schema);
System.out.println(alias + ": " + (pretty ? s.prettyPrint() : s.toString()));
return s;
} else {
System.out.println("Schema for " + alias + " unknown.");
return null;
}
} catch (FrontendException fee) {
int errCode = 1001;
String msg = "Unable to describe schema for alias " + alias;
throw new FrontendException (msg, errCode, PigException.INPUT, false, null, fee);
} finally {
pigContext.inDumpSchema = false;
}
}
/**
* Write the schema for a nestedAlias to System.out. Denoted by
* alias::nestedAlias.
*
* @param alias Alias whose schema has nestedAlias
* @param nestedAlias Alias whose schema will be written out
* @return Schema of alias dumped
* @throws IOException
*/
public Schema dumpSchemaNested(String alias, String nestedAlias) throws IOException {
try {
pigContext.inDumpSchema = true;
if ("@".equals(alias)) {
alias = getLastRel();
}
Operator op = getOperatorForAlias( alias );
if( op instanceof LOForEach ) {
LogicalSchema nestedSc = ((LOForEach)op).dumpNestedSchema(alias, nestedAlias);
if (nestedSc!=null) {
Schema s = org.apache.pig.newplan.logical.Util.translateSchema(nestedSc);
System.out.println(alias+ "::" + nestedAlias + ": " + s.toString());
return s;
}
else {
System.out.println("Schema for "+ alias+ "::" + nestedAlias + " unknown.");
return null;
}
}
else {
int errCode = 1001;
String msg = "Unable to describe schema for " + alias + "::" + nestedAlias;
throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
}
} finally {
pigContext.inDumpSchema = false;
}
}
/**
* Set the name of the job. This name will get translated to mapred.job.name.
* @param name of job
*/
public void setJobName(String name) {
jobName = PigContext.JOB_NAME_PREFIX + ":" + name;
}
/**
* Set Hadoop job priority. This value will get translated to mapred.job.priority.
* @param priority valid values are found in {@link org.apache.hadoop.mapred.JobPriority}
*/
public void setJobPriority(String priority) {
jobPriority = priority;
}
/**
* Executes a Pig Latin script up to and including indicated alias. That is, if a user does:
* <pre>
* PigServer server = new PigServer();
* server.registerQuery("A = load 'foo';");
* server.registerQuery("B = filter A by $0 &gt; 0;");
* server.registerQuery("C = order B by $1;");
* </pre>
* Then
* <pre>
* server.openIterator("B");
* </pre>
* filtered but unsorted data will be returned. If instead a user does
* <pre>
* server.openIterator("C");
* </pre>
* filtered and sorted data will be returned.
* @param id Alias to open iterator for
* @return iterator of tuples returned from the script
* @throws IOException
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
try {
pigContext.getProperties().setProperty( PigContext.JOB_NAME, jobName );
if( jobPriority != null ) {
pigContext.getProperties().setProperty( PigContext.JOB_PRIORITY, jobPriority );
}
ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
.toString(), Utils.getTmpFileCompressorName(pigContext)
+ "()");
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
return job.getResults();
} else if (job.getStatus() == JOB_STATUS.FAILED
&& job.getException() != null) {
// throw the backend exception in the failed case
Exception e = job.getException();
int errCode = 1066;
String msg = "Unable to open iterator for alias " + id
+ ". Backend error : " + e.getMessage();
throw new FrontendException(msg, errCode, PigException.INPUT, e);
} else {
throw new IOException("Job terminated with anomalous status "
+ job.getStatus().toString());
}
} catch (FrontendException e) {
throw e;
} catch (Exception e) {
int errCode = 1066;
String msg = "Unable to open iterator for alias " + id;
throw new FrontendException(msg, errCode, PigException.INPUT, e);
}
}
/**
* Executes a Pig Latin script up to and including indicated alias and stores the resulting
* records into a file. That is, if a user does:
* <pre>
* PigServer server = new PigServer();
* server.registerQuery("A = load 'foo';");
* server.registerQuery("B = filter A by $0 &gt; 0;");
* server.registerQuery("C = order B by $1;");
* </pre>
* Then
* <pre>
* server.store("B", "bar");
* </pre>
* filtered but unsorted data will be stored to the file <tt>bar</tt>. If instead a user does
* <pre>
* server.store("C", "bar");
* </pre>
* filtered and sorted data will be stored to the file <tt>bar</tt>.
* Equivalent to calling {@link #store(String, String, String)} with
* <tt>org.apache.pig.PigStorage</tt> as the store function.
* @param id The alias to store
* @param filename The file to which to store to
* @return {@link ExecJob} containing information about this job
* @throws IOException
*/
public ExecJob store(String id, String filename) throws IOException {
return store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
}
/**
* Executes a Pig Latin script up to and including indicated alias and stores the resulting
* records into a file. That is, if a user does:
* <pre>
* PigServer server = new PigServer();
* server.registerQuery("A = load 'foo';");
* server.registerQuery("B = filter A by $0 &gt; 0;");
* server.registerQuery("C = order B by $1;");
* </pre>
* Then
* <pre>
* server.store("B", "bar", "mystorefunc");
* </pre>
* filtered but unsorted data will be stored to the file <tt>bar</tt> using
* <tt>mystorefunc</tt>. If instead a user does
* <pre>
* server.store("C", "bar", "mystorefunc");
* </pre>
* filtered and sorted data will be stored to the file <tt>bar</tt> using
* <tt>mystorefunc</tt>.
* <p>
* @param id The alias to store
* @param filename The file to which to store to
* @param func store function to use
* @return {@link ExecJob} containing information about this job
* @throws IOException
*/
public ExecJob store(String id, String filename, String func)
throws IOException {
PigStats stats = storeEx(id, filename, func);
if (stats.getOutputStats().size() < 1) {
throw new IOException("Couldn't retrieve job.");
}
OutputStats output = stats.getOutputStats().get(0);
if(stats.isSuccessful()){
return new HJob(JOB_STATUS.COMPLETED, pigContext, output
.getPOStore(), output.getAlias(), stats);
}else{
HJob job = new HJob(JOB_STATUS.FAILED, pigContext,
output.getPOStore(), output.getAlias(), stats);
//check for exception
Exception ex = null;
for(JobStats js : stats.getJobGraph()){
if(js.getException() != null) {
ex = js.getException();
}
}
job.setException(ex);
return job;
}
}
private PigStats storeEx(String alias, String filename, String func)
throws IOException {
if ("@".equals(alias)) {
alias = getLastRel();
}
currDAG.parseQuery();
currDAG.skipStores(); // skip the stores that have already been processed
currDAG.buildPlan( alias );
try {
QueryParserUtils.attachStorePlan(scope, currDAG.lp, filename, func, currDAG.getOperator( alias ), alias, pigContext);
currDAG.compile();
return executeCompiledLogicalPlan();
} catch (PigException e) {
int errCode = 1002;
String msg = "Unable to store alias " + alias;
throw new PigException(msg, errCode, PigException.INPUT, e);
}
}
/**
* Provide information on how a pig query will be executed. For now
* this information is very developer focussed, and probably not very
* useful to the average user.
* @param alias Name of alias to explain.
* @param stream PrintStream to write explanation to.
* @throws IOException if the requested alias cannot be found.
*/
public void explain(String alias,
PrintStream stream) throws IOException {
explain(alias, "text", true, false, stream, stream, null, null);
}
/**
* Provide information on how a pig query will be executed.
* @param alias Name of alias to explain.
* @param format Format in which the explain should be printed. If text, then the plan will
* be printed in plain text. Otherwise, the execution plan will be printed in
* <a href="http://en.wikipedia.org/wiki/DOT_language">DOT</a> format.
* @param verbose Controls the amount of information printed
* @param markAsExecute When set will treat the explain like a
* call to execute in the respoect that all the pending stores are
* marked as complete.
* @param lps Stream to print the logical tree
* @param eps Stream to print the ExecutionEngine trees. If null, then will print to files
* @param dir Directory to print ExecutionEngine trees. If null, will use eps
* @param suffix Suffix of file names
* @throws IOException if the requested alias cannot be found.
*/
public void explain(String alias,
String format,
boolean verbose,
boolean markAsExecute,
PrintStream lps,
PrintStream eps,
File dir,
String suffix) throws IOException {
try {
pigContext.inExplain = true;
buildStorePlan( alias );
currDAG.lp.optimize(pigContext);
//Only add root xml node if all plans are being written to same stream.
if (format == "xml" && lps == eps) {
lps.println("<plan>");
}
currDAG.lp.explain(lps, format, verbose);
if( currDAG.lp.size() == 0 ) {
if (format == "xml" && lps == eps) {
lps.println("</plan>");
}
return;
}
pigContext.getExecutionEngine().explain(currDAG.lp, pigContext, eps, format, verbose, dir, suffix );
if (format.equals("xml") && lps == eps) {
lps.println("</plan>");
}
if (markAsExecute) {
currDAG.markAsExecuted();
}
} catch (Exception e) {
int errCode = 1067;
String msg = "Unable to explain alias " + alias;
throw new FrontendException(msg, errCode, PigException.INPUT, e);
} finally {
pigContext.inExplain = false;
}
}
/**
* Returns the unused byte capacity of an HDFS filesystem. This value does
* not take into account a replication factor, as that can vary from file
* to file. Thus if you are using this to determine if you data set will fit
* in the HDFS, you need to divide the result of this call by your specific replication
* setting.
* @return unused byte capacity of the file system.
* @throws IOException
*/
public long capacity() throws IOException {
if (pigContext.getExecType().isLocal()) {
throw new IOException("capacity only supported for non-local execution");
}
else {
DataStorage dds = pigContext.getDfs();
Map<String, Object> stats = dds.getStatistics();
String rawCapacityStr = (String) stats.get(DataStorage.RAW_CAPACITY_KEY);
String rawUsedStr = (String) stats.get(DataStorage.RAW_USED_KEY);
if ((rawCapacityStr == null) || (rawUsedStr == null)) {
throw new IOException("Failed to retrieve capacity stats");
}
long rawCapacityBytes = new Long(rawCapacityStr).longValue();
long rawUsedBytes = new Long(rawUsedStr).longValue();
return rawCapacityBytes - rawUsedBytes;
}
}
/**
* Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
* @param filename
* @return length of the file in bytes
* @throws IOException
*/
public long fileSize(String filename) throws IOException {
DataStorage dfs = pigContext.getDfs();
ElementDescriptor elem = dfs.asElement(filename);
Map<String, Object> stats = elem.getStatistics();
long length = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
int replication = (Short) stats
.get(ElementDescriptor.BLOCK_REPLICATION_KEY);
return length * replication;
}
/**
* Test whether a file exists.
* @param filename to test
* @return true if file exists, false otherwise
* @throws IOException
*/
public boolean existsFile(String filename) throws IOException {
ElementDescriptor elem = pigContext.getDfs().asElement(filename);
return elem.exists();
}
/**
* Delete a file.
* @param filename to delete
* @return true
* @throws IOException
*/
public boolean deleteFile(String filename) throws IOException {
// Check if this operation is permitted
filter.validate(PigCommandFilter.Command.RM);
filter.validate(PigCommandFilter.Command.RMF);
ElementDescriptor elem = pigContext.getDfs().asElement(filename);
elem.delete();
return true;
}
/**
* Rename a file.
* @param source file to rename
* @param target new file name
* @return true
* @throws IOException
*/
public boolean renameFile(String source, String target) throws IOException {
// Check if this operation is permitted
filter.validate(PigCommandFilter.Command.MV);
pigContext.rename(source, target);
return true;
}
/**
* Make a directory.
* @param dirs directory to make
* @return true
* @throws IOException
*/
public boolean mkdirs(String dirs) throws IOException {
// Check if this operation is permitted
filter.validate(PigCommandFilter.Command.MKDIR);
ContainerDescriptor container = pigContext.getDfs().asContainer(dirs);
container.create();
return true;
}
/**
* List the contents of a directory.
* @param dir name of directory to list
* @return array of strings, one for each file name
* @throws IOException
*/
public String[] listPaths(String dir) throws IOException {
// Check if this operation is permitted
filter.validate(PigCommandFilter.Command.LS);
Collection<String> allPaths = new ArrayList<String>();
ContainerDescriptor container = pigContext.getDfs().asContainer(dir);
Iterator<ElementDescriptor> iter = container.iterator();
while (iter.hasNext()) {
ElementDescriptor elem = iter.next();
allPaths.add(elem.toString());
}
String[] type = new String[1];
return allPaths.toArray(type);
}
/**
* Return a map containing the logical plan associated with each alias.
*
* @return map
*/
public Map<String, LogicalPlan> getAliases() {
Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
for (LogicalRelationalOperator op : currDAG.getAliases().keySet()) {
String alias = op.getAlias();
if(null != alias) {
aliasPlans.put(alias, currDAG.getAliases().get(op));
}
}
return aliasPlans;
}
/**
* Reclaims resources used by this instance of PigServer. This method
* deletes all temporary files generated by the current thread while
* executing Pig commands.
*/
public void shutdown() {
// clean-up activities
// TODO: reclaim scope to free up resources. Currently
// this is not implemented and throws an exception
// hence, for now, we won't call it.
//
// pigContext.getExecutionEngine().reclaimScope(this.scope);
FileLocalizer.deleteTempFiles();
}
/**
* Get the set of all current aliases.
* @return set
*/
public Set<String> getAliasKeySet() {
return currDAG.getAliasOp().keySet();
}
public Map<Operator, DataBag> getExamples(String alias) throws IOException {
try {
if (currDAG.isBatchOn() && alias != null) {
currDAG.parseQuery();
currDAG.buildPlan( null );
execute();
}
currDAG.parseQuery();
currDAG.skipStores();
currDAG.buildPlan( alias );
currDAG.compile();
} catch (IOException e) {
//Since the original script is parsed anyway, there should not be an
//error in this parsing. The only reason there can be an error is when
//the files being loaded in load don't exist anymore.
e.printStackTrace();
}
ExampleGenerator exgen = new ExampleGenerator( currDAG.lp, pigContext );
try {
return exgen.getExamples();
} catch (ExecException e) {
e.printStackTrace(System.out);
throw new IOException("ExecException" , e);
} catch (Exception e) {
e.printStackTrace(System.out);
throw new IOException("Exception ", e);
}
}
public void printHistory(boolean withNumbers) {
List<String> sc = currDAG.getScriptCache();
if(!sc.isEmpty()) {
for(int i = 0 ; i < sc.size(); i++) {
if(withNumbers) System.out.print((i+1)+" ");
System.out.println(sc.get(i));
}
}
}
private void buildStorePlan(String alias) throws IOException {
currDAG.parseQuery();
currDAG.buildPlan( alias );
if( !isBatchOn() || alias != null ) {
// MRCompiler needs a store to be the leaf - hence
// add a store to the plan to explain
QueryParserUtils.attachStorePlan(scope, currDAG.lp, "fakefile", null, currDAG.getOperator( alias ),
"fake", pigContext );
}
currDAG.compile();
}
/**
* Compile and execute the current plan.
* @return
* @throws IOException
*/
private PigStats execute() throws IOException {
pigContext.getProperties().setProperty( PigContext.JOB_NAME, jobName );
if( jobPriority != null ) {
pigContext.getProperties().setProperty( PigContext.JOB_PRIORITY, jobPriority );
}
// In this plan, all stores in the plan will be executed. They should be ignored if the plan is reused.
currDAG.countExecutedStores();
currDAG.compile();
if( currDAG.lp.size() == 0 ) {
return PigStats.get();
}
pigContext.getProperties().setProperty("pig.logical.plan.signature", currDAG.lp.getSignature());
PigStats stats = executeCompiledLogicalPlan();
return stats;
}
private PigStats executeCompiledLogicalPlan() throws ExecException,
FrontendException {
// discover pig features used in this script
ScriptState.get().setScriptFeatures(currDAG.lp);
currDAG.lp.optimize(pigContext);
return launchPlan(currDAG.lp, "job_pigexec_");
}
/**
* A common method for launching the jobs according to the logical plan
* @param lp The logical plan
* @param jobName A String containing the job name to be used
* @return The PigStats object
* @throws ExecException
* @throws FrontendException
*/
protected PigStats launchPlan(LogicalPlan lp, String jobName) throws ExecException, FrontendException {
PigStats stats = null;
try {
stats = pigContext.getExecutionEngine().launchPig(lp, jobName, pigContext);
} catch (ExecException e) {
throw e;
} catch (FrontendException e) {
throw e;
} catch (Exception e) {
// There are a lot of exceptions thrown by the launcher. If this
// is an ExecException, just let it through. Else wrap it.
int errCode = 2043;
String msg = "Unexpected error during execution.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
return stats;
}
/**
* NOTE: For testing only. Don't use.
* @throws IOException
*/
@SuppressWarnings("unused")
private LogicalPlan buildLp() throws IOException {
currDAG.buildPlan( null);
currDAG.compile();
return currDAG.lp;
}
private LogicalRelationalOperator getOperatorForAlias(String alias) throws IOException {
buildStorePlan (alias);
LogicalRelationalOperator op = (LogicalRelationalOperator)currDAG.getOperator( alias );
if( op == null ) {
int errCode = 1005;
String msg = "No plan for " + alias + " to describe";
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
return op;
}
/**
* Returns data associated with LogicalPlan. It makes
* sense to call this method only after a query/script
* has been registered with one of the {@link #registerQuery(String)}
* or {@link #registerScript(InputStream)} methods.
*
* @return LogicalPlanData
*/
public LogicalPlanData getLogicalPlanData() {
return new LogicalPlanData(currDAG.lp);
}
/*
* This class holds the internal states of a grunt shell session.
*/
protected class Graph {
private final Map<LogicalRelationalOperator, LogicalPlan> aliases = new HashMap<LogicalRelationalOperator, LogicalPlan>();
private Map<String, Operator> operators = new HashMap<String, Operator>();
private String lastRel;
private final List<String> scriptCache = new ArrayList<String>();
// the fileNameMap contains filename to canonical filename
// mappings. This is done so we can reparse the cached script
// and remember the translation (current directory might only
// be correct during the first parse
private Map<String, String> fileNameMap = new HashMap<String, String>();
private final boolean batchMode;
private int processedStores = 0;
private LogicalPlan lp;
private int currentLineNum = 0;
public Graph(boolean batchMode) {
this.batchMode = batchMode;
this.lp = new LogicalPlan();
};
/**
* Call back method for counting executed stores.
*/
private void countExecutedStores() throws FrontendException {
List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
processedStores += sinks.size();
}
Map<LogicalRelationalOperator, LogicalPlan> getAliases() {
return aliases;
}
Map<String, Operator> getAliasOp() {
return operators;
}
boolean isBatchOn() {
return batchMode;
};
boolean isBatchEmpty() {
for( Operator op : lp.getSinks() ) {
if( op instanceof LOStore )
return false;
}
return true;
}
void markAsExecuted() {
}
public LogicalPlan getLogicalPlan() {
return this.lp;
}
/**
* Get the operator with the given alias in the raw plan. Null if not
* found.
*/
Operator getOperator(String alias) throws FrontendException {
return operators.get( alias );
}
public LogicalPlan getPlan(String alias) throws IOException {
LogicalPlan plan = lp;
if (alias != null) {
LogicalRelationalOperator op = (LogicalRelationalOperator) operators.get(alias);
if(op == null) {
int errCode = 1003;
String msg = "Unable to find an operator for alias " + alias;
throw new FrontendException(msg, errCode, PigException.INPUT);
}
plan = aliases.get(op);
}
return plan;
}
/**
* Build a plan for the given alias. Extra branches and child branch under alias
* will be ignored. Dependent branch (i.e. scalar) will be kept.
* @throws IOException
*/
void buildPlan(String alias) throws IOException {
if( alias == null )
skipStores();
final Queue<Operator> queue = new LinkedList<Operator>();
if( alias != null ) {
Operator op = getOperator( alias );
if (op == null) {
String msg = "Unable to find an operator for alias " + alias;
throw new FrontendException( msg, 1003, PigException.INPUT );
}
queue.add( op );
} else {
List<LOStore> stores = Util.getLogicalRelationalOperators(lp, LOStore.class);
for (LOStore op : stores) {
boolean addSink = true;
// Only add if all the successors are loads
List<Operator> succs = lp.getSuccessors(op);
if (succs != null && succs.size() > 0) {
for (Operator succ : succs) {
if (!(succ instanceof LOLoad)) {
addSink = false;
break;
}
}
}
if (addSink) {
queue.add(op);
}
}
}
LogicalPlan plan = new LogicalPlan();
while( !queue.isEmpty() ) {
Operator currOp = queue.poll();
plan.add( currOp );
List<Operator> preds = lp.getPredecessors( currOp );
if( preds != null ) {
List<Operator> ops = new ArrayList<Operator>( preds );
for( Operator pred : ops ) {
if( !queue.contains( pred ) )
queue.add( pred );
plan.connect( pred, currOp );
}
}
// visit expression associated with currOp. If it refers to any other operator
// that operator is also going to be enqueued.
currOp.accept( new AllExpressionVisitor( plan, new DependencyOrderWalker( plan ) ) {
@Override
protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan exprPlan)
throws FrontendException {
return new LogicalExpressionVisitor( exprPlan, new DependencyOrderWalker( exprPlan ) ) {
@Override
public void visit(ScalarExpression expr) throws FrontendException {
Operator refOp = expr.getImplicitReferencedOperator();
if( !queue.contains( refOp ) )
queue.add( refOp );
}
};
}
}
);
currOp.setPlan( plan );
}
lp = plan;
}
/**
* Remove stores that have been executed previously from the overall plan.
*/
private void skipStores() throws IOException {
// Get stores specifically
List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
List<Operator> sinksToRemove = new ArrayList<Operator>();
int skipCount = processedStores;
if( skipCount > 0 ) {
for( LOStore sink : sinks ) {
sinksToRemove.add( sink );
skipCount--;
if( skipCount == 0 )
break;
}
}
for( Operator op : sinksToRemove ) {
// It's fully possible in the multiquery case that
// a store that is not a leaf (sink) and therefor has
// successors that need to be removed.
removeToLoad(op);
Operator pred = lp.getPredecessors( op ).get(0);
lp.disconnect( pred, op );
lp.remove( op );
}
}
private void removeToLoad(Operator toRemove) throws IOException {
List<Operator> successors = lp.getSuccessors(toRemove);
List<Operator> succToRemove = new ArrayList<Operator>();
if (successors != null && successors.size() > 0) {
succToRemove.addAll(successors);
for (Operator succ : succToRemove) {
lp.disconnect( toRemove, succ );
if (!(succ instanceof LOLoad)) {
removeToLoad(succ);
lp.remove(succ);
}
}
}
}
/**
* Accumulate the given statement to previous query statements and generate
* an overall (raw) plan.
*/
void registerQuery(String query, int startLine, boolean validateEachStatement,
boolean skipParseForBatch) throws IOException {
if( batchMode ) {
if( startLine == currentLineNum ) {
String line = scriptCache.remove( scriptCache.size() - 1 );
scriptCache.add( line + query );
} else {
while( startLine > currentLineNum + 1 ) {
scriptCache.add( "" );
currentLineNum++;
}
BufferedReader br = new BufferedReader(new StringReader(query));
String line = br.readLine();
while (line != null) {
scriptCache.add(line);
currentLineNum++;
line = br.readLine();
}
}
if (skipParseForBatch) {
return;
}
} else {
scriptCache.add( query );
}
if(validateEachStatement){
validateQuery();
}
parseQuery();
if( !batchMode ) {
buildPlan( null );
for( Operator sink : lp.getSinks() ) {
if( sink instanceof LOStore ) {
try {
execute();
} catch (Exception e) {
int errCode = 1002;
String msg = "Unable to store alias "
+ ((LOStore) sink).getAlias();
throw new FrontendException(msg, errCode,
PigException.INPUT, e);
}
break; // We should have at most one store, so break here.
}
}
}
}
private void validateQuery() throws FrontendException {
String query = buildQuery();
QueryParserDriver parserDriver = new QueryParserDriver( pigContext, scope, fileNameMap );
try {
LogicalPlan plan = parserDriver.parse( query );
plan.validate(pigContext, scope, true);
} catch(FrontendException ex) {
scriptCache.remove( scriptCache.size() -1 );
throw ex;
}
}
public List<String> getScriptCache() {
return scriptCache;
}
/**
* Parse the accumulated pig statements and generate an overall plan.
*/
private void parseQuery() throws FrontendException {
UDFContext.getUDFContext().reset();
UDFContext.getUDFContext().setClientSystemProps(pigContext.getProperties());
String query = buildQuery();
if( query.isEmpty() ) {
lp = new LogicalPlan();
return;
}
try {
QueryParserDriver parserDriver = new QueryParserDriver( pigContext, scope, fileNameMap );
lp = parserDriver.parse( query );
operators = parserDriver.getOperators();
lastRel = parserDriver.getLastRel();
} catch(Exception ex) {
scriptCache.remove( scriptCache.size() -1 ); // remove the bad script from the cache.
PigException pe = LogUtils.getPigException(ex);
int errCode = 1000;
String msg = "Error during parsing. "
+ (pe == null ? ex.getMessage() : pe.getMessage());
log.error("exception during parsing: " + msg, ex);
if (null == pe) {
throw new FrontendException (msg, errCode, PigException.INPUT , ex);
} else {
throw new FrontendException (msg, errCode, PigException.INPUT , ex, pe.getSourceLocation() );
}
}
}
public String getLastRel() {
return lastRel;
}
private String buildQuery() {
StringBuilder accuQuery = new StringBuilder();
for( String line : scriptCache ) {
accuQuery.append( line + "\n" );
}
return accuQuery.toString();
}
private void compile() throws IOException {
lp.validate(pigContext, scope, false);
currDAG.postProcess();
}
private void postProcess() throws IOException {
// The following code deals with store/load combination of
// intermediate files. In this case we will replace the load
// operator
// with a (implicit) split operator, iff the load/store
// func is reversible (because that's when we can safely
// skip the load and keep going with the split output). If
// the load/store func is not reversible (or they are
// different functions), we connect the store and the load
// to remember the dependency.
Set<LOLoad> loadOps = new HashSet<LOLoad>();
List<Operator> sources = lp.getSources();
for (Operator source : sources) {
if (source instanceof LOLoad) {
loadOps.add((LOLoad)source);
}
}
Set<LOStore> storeOps = new HashSet<LOStore>();
List<Operator> sinks = lp.getSinks();
for (Operator sink : sinks) {
if (sink instanceof LOStore) {
storeOps.add((LOStore)sink);
}
}
if ("true".equals(pigContext.getProperties().getProperty(PIG_LOCATION_CHECK_STRICT))) {
log.info("Output location strick check enabled");
checkDuplicateStoreLoc(storeOps);
}
for (LOLoad load : loadOps) {
for (LOStore store : storeOps) {
String ifile = load.getFileSpec().getFileName();
String ofile = store.getFileSpec().getFileName();
if (ofile.equals(ifile)) {
// if there is no path from the load to the store,
// then connect the store to the load to create the
// dependency of the store on the load. If there is
// a path from the load to the store, then we should
// not connect the store to the load and create a cycle
if (!store.getPlan().pathExists(load, store)) {
store.getPlan().connect(store, load);
}
}
}
}
}
/**
* This method checks whether the multiple sinks (STORE) use the same
* "file-based" location. If yes, throws a RuntimeException
*
* @param storeOps
*/
private void checkDuplicateStoreLoc(Set<LOStore> storeOps) {
Set<String> uniqueStoreLoc = new HashSet<String>();
for(LOStore store : storeOps) {
String fileName = store.getFileSpec().getFileName();
if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName, new Configuration(true))) {
throw new RuntimeException("Script contains 2 or more STORE statements writing to same location : "+ fileName);
}
}
}
protected Graph duplicate() {
// There are two choices on how we duplicate the logical plan
// 1 - we really clone each operator and connect up the cloned operators
// 2 - we cache away the script till the point we need to clone
// and then simply re-parse the script.
// The latter approach is used here
// FIXME: There is one open issue with this now:
// Consider the following script:
// A = load 'file:/somefile';
// B = filter A by $0 > 10;
// store B into 'bla';
// rm 'file:/somefile';
// A = load 'file:/someotherfile'
// when we try to clone - we try to reparse
// from the beginning and currently the parser
// checks for file existence of files in the load
// in the case where the file is a local one -i.e. with file: prefix
// This will be a known issue now and we will need to revisit later
// parse each line of the cached script
int lineNumber = 1;
// create data structures needed for parsing
Graph graph = new Graph(isBatchOn());
graph.processedStores = processedStores;
graph.fileNameMap = new HashMap<String, String>(fileNameMap);
try {
for (Iterator<String> it = scriptCache.iterator(); it.hasNext(); lineNumber++) {
// always doing registerQuery irrespective of the batch mode
// TODO: Need to figure out if anything different needs to happen if batch
// mode is not on
// Don't have to do the validation again, so set validateEachStatement param to false
graph.registerQuery(it.next(), lineNumber, false, false);
}
graph.postProcess();
} catch (IOException ioe) {
ioe.printStackTrace();
graph = null;
}
return graph;
}
}
/**
* This can be called to indicate if the query is being parsed/compiled
* in a mode that expects each statement to be validated as it is
* entered, instead of just doing it once for whole script.
* @param validateEachStatement
*/
public void setValidateEachStatement(boolean validateEachStatement) {
this.validateEachStatement = validateEachStatement;
}
/**
* Set whether to skip parsing while registering the query in batch mode
* @param skipParseInRegisterForBatch
*/
public void setSkipParseInRegisterForBatch(boolean skipParseInRegisterForBatch) {
this.skipParseInRegisterForBatch = skipParseInRegisterForBatch;
}
public String getLastRel() {
return currDAG.getLastRel();
}
public boolean isDebugOn() {
if (Logger.getLogger("org.apache.pig").getLevel() == Level.DEBUG) {
return true;
} else {
return false;
}
}
public String getJobName() {
return jobName;
}
public String getJobPriority() {
return jobPriority;
}
}