blob: 333535bc0396e9ab3903060f56f9e207017ac6c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties ;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
import org.apache.pig.impl.logicalLayer.optimizer.streaming.LoadOptimizer;
import org.apache.pig.impl.logicalLayer.optimizer.streaming.StoreOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.pen.DisplayExamples;
import org.apache.pig.pen.ExGen;
/**
*
* This class is the program's connection to Pig. Typically a program will create a PigServer
* instance. The programmer then registers queries using registerQuery() and
* retrieves results using openIterator() or store().
*
*/
public class PigServer {
private final Log log = LogFactory.getLog(getClass());
public String Result;
/**
* The type of query execution
*/
static public enum ExecType {
/**
* Run everything on the local machine
*/
LOCAL,
/**
* Use the Hadoop Map/Reduce framework
*/
MAPREDUCE,
/**
* Use the Experimental Hadoop framework; not available yet.
*/
PIG
}
public static ExecType parseExecType(String str) throws IOException {
String normStr = str.toLowerCase();
if (normStr.equals("local")) return ExecType.LOCAL;
if (normStr.equals("mapreduce")) return ExecType.MAPREDUCE;
if (normStr.equals("mapred")) return ExecType.MAPREDUCE;
if (normStr.equals("pig")) return ExecType.PIG;
if (normStr.equals("pigbody")) return ExecType.PIG;
throw new IOException("Unrecognized exec type: " + str);
}
Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
PigContext pigContext;
private String scope = constructScope();
private String constructScope() {
// scope servers for now as a session id
// scope = user_id + "-" + time_stamp;
String user = System.getProperty("user.name", "DEFAULT_USER_ID");
String date = (new Date()).toString();
return user + "-" + date;
}
public PigServer(String execTypeString) throws ExecException, IOException {
this(parseExecType(execTypeString));
}
public PigServer(ExecType execType) throws ExecException {
this(execType, PropertiesUtil.loadPropertiesFromFile());
}
public PigServer(ExecType execType, Properties properties) throws ExecException {
this.pigContext = new PigContext(execType, properties);
if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
setJobName("DefaultJobName") ;
}
pigContext.connect();
}
public PigServer(PigContext context) throws ExecException {
this.pigContext = context;
if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
setJobName("DefaultJobName") ;
}
pigContext.connect();
}
public PigContext getPigContext(){
return pigContext;
}
public void debugOn() {
pigContext.debug = true;
}
public void debugOff() {
pigContext.debug = false;
}
/**
* Add a path to be skipped while automatically shipping binaries for
* streaming.
*
* @param path path to be skipped
*/
public void addPathToSkip(String path) {
pigContext.addPathToSkip(path);
}
/**
* Defines an alias for the given function spec. This
* is useful for functions that require arguments to the
* constructor.
*
* @param aliases - the new function alias to define.
* @param functionSpec - the name of the function and any arguments.
* It should have the form: classname('arg1', 'arg2', ...)
*/
public void registerFunction(String function, String functionSpec) {
pigContext.registerFunction(function, functionSpec);
}
/**
* Defines an alias for the given streaming command.
*
* @param commandAlias - the new command alias to define
* @param command - streaming command to be executed
*/
public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
pigContext.registerStreamCmd(commandAlias, command);
}
private URL locateJarFromResources(String jarName) throws IOException {
Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
URL resourceLocation = null;
if (urls.hasMoreElements()) {
resourceLocation = urls.nextElement();
}
if (pigContext.debug && urls.hasMoreElements()) {
String logMessage = "Found multiple resources that match "
+ jarName + ": " + resourceLocation;
while (urls.hasMoreElements()) {
logMessage += (logMessage + urls.nextElement() + "; ");
}
log.debug(logMessage);
}
return resourceLocation;
}
/**
* Registers a jar file. Name of the jar file can be an absolute or
* relative path.
*
* If multiple resources are found with the specified name, the
* first one is registered as returned by getSystemResources.
* A warning is issued to inform the user.
*
* @param name of the jar file to register
* @throws IOException
*/
public void registerJar(String name) throws IOException {
// first try to locate jar via system resources
// if this fails, try by using "name" as File (this preserves
// compatibility with case when user passes absolute path or path
// relative to current working directory.)
if (name != null) {
URL resource = locateJarFromResources(name);
if (resource == null) {
File f = new File(name);
if (!f.canRead()) {
throw new IOException("Can't read jar file: " + name);
}
resource = f.toURI().toURL();
}
pigContext.addJar(resource);
}
}
/**
* Register a query with the Pig runtime. The query is parsed and registered, but it is not
* executed until it is needed.
*
* @param query
* a Pig Latin expression to be evaluated.
* @return a handle to the query.
* @throws IOException
*/
public void registerQuery(String query) throws IOException {
// Bugzilla Bug 1006706 -- ignore empty queries
//=============================================
if(query != null) {
query = query.trim();
if(query.length() == 0) return;
}else {
return;
}
// parse the query into a logical plan
LogicalPlan lp = null;
LogicalOperator op = null;
try {
lp = (new LogicalPlanBuilder(pigContext).parse(scope, query, aliases, opTable));
op = opTable.get(lp.getRoot());
} catch (ParseException e) {
throw (IOException) new IOException(e.getMessage()).initCause(e);
}
if (lp.getAlias() != null) {
aliases.put(lp.getAlias(), lp);
}
// No need to do anything about DEFINE
if (op instanceof LODefine) {
return;
}
// Check if we just processed a LOStore i.e. STORE
if (op instanceof LOStore) {
try {
optimizeAndRunQuery(lp);
}
catch (ExecException e) {
throw WrappedIOException.wrap("Unable to store alias " +
lp.getAlias(), e);
}
}
}
public void dumpSchema(String alias) throws IOException{
LogicalPlan lp = aliases.get(alias);
if (lp == null)
throw new IOException("Invalid alias - " + alias);
TupleSchema schema = lp.getOpTable().get(lp.getRoot()).outputSchema();
System.out.println(schema.toString());
}
public void setJobName(String name){
pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
}
/**
* Forces execution of query (and all queries from which it reads), in order to materialize
* result
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
if (!aliases.containsKey(id))
throw new IOException("Invalid alias: " + id);
// TODO: front-end could actually remember what logical plans have been
// already submitted to the back-end for compilation and
// execution.
LogicalPlan readFrom = aliases.get(id);
// Run
try {
ExecJob job = optimizeAndRunQuery(readFrom);
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
return job.getResults();
}
else {
throw new IOException("Job terminated with anomalous status " + job.getStatus().toString());
}
}
catch (ExecException e) {
throw WrappedIOException.wrap("Unable to open iterator for alias: " + id, e);
}
}
/**
* Store an alias into a file
* @param id: The alias to store
* @param filename: The file to which to store to
* @throws IOException
*/
public void store(String id, String filename) throws IOException {
store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
}
/**
* forces execution of query (and all queries from which it reads), in order to store result in file
*/
public void store(String id, String filename, String func) throws IOException{
if (!aliases.containsKey(id))
throw new IOException("Invalid alias: " + id);
if (FileLocalizer.fileExists(filename, pigContext.getDfs())) {
StringBuilder sb = new StringBuilder();
sb.append("Output file ");
sb.append(filename);
sb.append(" already exists. Can't overwrite.");
throw new IOException(sb.toString());
}
LogicalPlan readFrom = aliases.get(id);
store(readFrom,filename,func);
}
public void store(LogicalPlan readFrom, String filename, String func) throws IOException {
LogicalPlan storePlan = QueryParser.generateStorePlan(readFrom.getOpTable(),
scope,
readFrom,
filename,
func,
pigContext);
// Optimize
Optimizer optimizer = new LoadOptimizer();
optimizer.optimize(readFrom);
try {
optimizeAndRunQuery(storePlan);
}
catch (ExecException e) {
throw WrappedIOException.wrap("Unable to store alias " +
storePlan.getAlias(), e);
}
}
private ExecJob optimizeAndRunQuery(LogicalPlan root) throws ExecException {
// Optimize the LogicalPlan
Optimizer loadOptimizer = new LoadOptimizer();
loadOptimizer.optimize(root);
Optimizer storeOptimizer = new StoreOptimizer();
storeOptimizer.optimize(root);
// Execute
ExecPhysicalPlan pp =
pigContext.getExecutionEngine().compile(root, null);
return pigContext.getExecutionEngine().execute(pp);
}
/**
* Provide information on how a pig query will be executed. For now
* this information is very developer focussed, and probably not very
* useful to the average user.
* @param alias Name of alias to explain.
* @param stream PrintStream to write explanation to.
* @throws IOException if the requested alias cannot be found.
*/
public void explain(String alias,
PrintStream stream) throws IOException {
stream.println("Logical Plan:");
LogicalPlan lp = aliases.get(alias);
if (lp == null) {
log.error("Invalid alias: " + alias);
stream.println("Invalid alias: " + alias);
throw new IOException("Invalid alias: " + alias);
}
lp.explain(stream);
stream.println("-----------------------------------------------");
stream.println("Physical Plan:");
try {
ExecPhysicalPlan pp =
pigContext.getExecutionEngine().compile(lp, null);
pp.explain(stream);
}
catch (ExecException e) {
StringBuilder sbException = new StringBuilder();
sbException.append("Failed to compile to phyiscal plan: ");
sbException.append(alias);
if (log.isErrorEnabled()) {
log.error(sbException.toString());
}
StringBuilder sb = new StringBuilder();
sb.append("Failed to compile the logical plan for ");
sb.append(alias);
sb.append(" into a physical plan");
stream.println(sb.toString());
throw WrappedIOException.wrap(sbException.toString(), e);
}
}
/**
* Returns the unused byte capacity of an HDFS filesystem. This value does
* not take into account a replication factor, as that can vary from file
* to file. Thus if you are using this to determine if you data set will fit
* in the HDFS, you need to divide the result of this call by your specific replication
* setting.
* @return
* @throws IOException
*/
public long capacity() throws IOException {
if (pigContext.getExecType() == ExecType.LOCAL) {
throw new IOException("capacity only supported for non-local execution");
}
else {
DataStorage dds = pigContext.getDfs();
Map<String, Object> stats = dds.getStatistics();
String rawCapacityStr = (String) stats.get(DataStorage.RAW_CAPACITY_KEY);
String rawUsedStr = (String) stats.get(DataStorage.RAW_USED_KEY);
if ((rawCapacityStr == null) || (rawUsedStr == null)) {
throw new IOException("Failed to retrieve capacity stats");
}
long rawCapacityBytes = new Long(rawCapacityStr).longValue();
long rawUsedBytes = new Long(rawUsedStr).longValue();
return rawCapacityBytes - rawUsedBytes;
}
}
/**
* Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
* @param filename
* @throws IOException
*/
public long fileSize(String filename) throws IOException {
DataStorage dfs = pigContext.getDfs();
ElementDescriptor elem = dfs.asElement(filename);
Map<String, Object> stats = elem.getStatistics();
long length = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
int replication = (Short) stats
.get(ElementDescriptor.BLOCK_REPLICATION_KEY);
return length * replication;
}
public boolean existsFile(String filename) throws IOException {
ElementDescriptor elem = pigContext.getDfs().asElement(filename);
return elem.exists();
}
public boolean deleteFile(String filename) throws IOException {
ElementDescriptor elem = pigContext.getDfs().asElement(filename);
elem.delete();
return true;
}
public boolean renameFile(String source, String target) throws IOException {
pigContext.rename(source, target);
return true;
}
public boolean mkdirs(String dirs) throws IOException {
ContainerDescriptor container = pigContext.getDfs().asContainer(dirs);
container.create();
return true;
}
public String[] listPaths(String dir) throws IOException {
Collection<String> allPaths = new ArrayList<String>();
ContainerDescriptor container = pigContext.getDfs().asContainer(dir);
Iterator<ElementDescriptor> iter = container.iterator();
while (iter.hasNext()) {
ElementDescriptor elem = iter.next();
allPaths.add(elem.toString());
}
return (String[])(allPaths.toArray());
}
public long totalHadoopTimeSpent() {
return MapReduceLauncher.totalHadoopTimeSpent;
}
public Map<String, LogicalPlan> getAliases() {
return this.aliases;
}
public void shutdown() {
// clean-up activities
// TODO: reclaim scope to free up resources. Currently
// this is not implemented and throws an exception
// hence, for now, we won't call it.
//
// pigContext.getExecutionEngine().reclaimScope(this.scope);
}
public void showExamples(String id) throws IOException {
if(!aliases.containsKey(id))
throw new IOException("Invalid alias : " + id);
LogicalPlan root = aliases.get(id);
showExamples(root);
}
public void showExamples(LogicalPlan lp) throws IOException{
Map<LogicalOperator, DataBag> exampleData = ExGen.GenerateExamples(lp, pigContext);
this.Result = DisplayExamples.PrintTabular(lp, exampleData);
System.out.println(Result);
}
}