/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.tools.grunt;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import jline.console.ConsoleReader;

import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.TupleFormat;
import org.apache.pig.parser.RegisterResolver;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigscript.parser.PigScriptParser;
import org.apache.pig.tools.pigscript.parser.PigScriptParserTokenManager;
import org.apache.pig.tools.pigscript.parser.TokenMgrError;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.validator.BlackAndWhitelistFilter;
import org.apache.pig.validator.PigCommandFilter;
import org.fusesource.jansi.Ansi;
import org.fusesource.jansi.AnsiConsole;

import com.google.common.collect.Lists;

public class GruntParser extends PigScriptParser {

    private static final Log log = LogFactory.getLog(GruntParser.class);
    private PigCommandFilter filter;

    public GruntParser(Reader reader) {
        this(reader, null);
        init();
    }

    public GruntParser(Reader reader, PigServer pigServer) {
        super(reader);
        mPigServer = pigServer;
        init();
    }

    public GruntParser(InputStream stream, String encoding) {
        this(stream, encoding, null);
    }

    public GruntParser(InputStream stream, String encoding, PigServer pigServer) {
        super(stream, encoding);
        mPigServer = pigServer;
        init();
    }

    public GruntParser(InputStream stream) {
        super(stream);
        init();
    }

    public GruntParser(InputStream stream, PigServer pigServer) {
        super(stream);
        mPigServer = pigServer;
        init();
    }

    public GruntParser(PigScriptParserTokenManager tm) {
        this(tm, null);
    }

    public GruntParser(PigScriptParserTokenManager tm, PigServer pigServer) {
        super(tm);
        mPigServer = pigServer;
        init();
    }

    private void init() {
        mDone = false;
        mLoadOnly = false;
        mExplain = null;
        mScriptIllustrate = false;

        setProps();

        filter = new BlackAndWhitelistFilter(mPigServer);
    }

    private void setProps() {
        mDfs = mPigServer.getPigContext().getDfs();
        mLfs = mPigServer.getPigContext().getLfs();
        mConf = mPigServer.getPigContext().getProperties();
        shell = new FsShell(ConfigurationUtil.toConfiguration(mConf));
    }

    @Override
    public void setInteractive(boolean isInteractive) {
        super.setInteractive(isInteractive);
        if(isInteractive) {
            setValidateEachStatement(true);
        }
    }

    public void setValidateEachStatement(boolean b) {
        mPigServer.setValidateEachStatement(b);
    }

    private void setBatchOn() {
        mPigServer.setBatchOn();
    }

    private void executeBatch() throws IOException {
        if (mPigServer.isBatchOn()) {
            if (mExplain != null) {
                explainCurrentBatch();
            }

            if (!mLoadOnly) {
                mPigServer.executeBatch();
                PigStats stats = PigStats.get();
                JobGraph jg = stats.getJobGraph();
                Iterator<JobStats> iter = jg.iterator();
                while (iter.hasNext()) {
                    JobStats js = iter.next();
                    if (!js.isSuccessful()) {
                        mNumFailedJobs++;
                        Exception exp = (js.getException() != null) ? js.getException()
                                : new ExecException(
                                        "Job " + (js.getJobId() == null ? "" : js.getJobId() + " ") +
                                        "failed, hadoop does not return any error message",
                                        2244);
                        LogUtils.writeLog(exp,
                                mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),
                                log,
                                "true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")),
                                "Pig Stack Trace");
                    } else {
                        mNumSucceededJobs++;
                    }
                }
            }
        }
    }

    private void discardBatch() throws IOException {
        if (mPigServer.isBatchOn()) {
            mPigServer.discardBatch();
        }
    }

    public int[] parseStopOnError() throws IOException, ParseException
    {
        return parseStopOnError(false);
    }

    /**
     * Parses Pig commands in either interactive mode or batch mode.
     * In interactive mode, executes the plan right away whenever a
     * STORE command is encountered.
     *
     * @throws IOException, ParseException
     */
    public int[] parseStopOnError(boolean sameBatch) throws IOException, ParseException
    {
        if (mPigServer == null) {
            throw new IllegalStateException();
        }

        if (!mInteractive && !sameBatch) {
            setBatchOn();
            mPigServer.setSkipParseInRegisterForBatch(true);
        }

        try {
            prompt();
            mDone = false;
            while(!mDone) {
                parse();
            }

            if (!sameBatch) {
                executeBatch();
            }
        } catch (TokenMgrError tme) {
            // This error from PigScriptParserTokenManager is not intuitive and always refers to the
            // last line, if we are reading whole query without parsing it line by line during batch
            // So executeBatch and get the error from QueryParser
            if (!mInteractive && !sameBatch) {
                executeBatch();
            }
            throw tme;
        } finally {
            if (!sameBatch) {
                discardBatch();
            }
        }
        int [] res = { mNumSucceededJobs, mNumFailedJobs };
        return res;
    }

    public void setLoadOnly(boolean loadOnly)
    {
        mLoadOnly = loadOnly;
    }

    public void setScriptIllustrate() {
        mScriptIllustrate = true;
    }

    @Override
    public void prompt()
    {
        if (mInteractive) {
            mConsoleReader.setPrompt("grunt> ");
        }
    }

    @Override
    protected void quit()
    {
        mDone = true;
    }

    public boolean isDone() {
        return mDone;
    }

    /*
     * parseOnly method added for supporting penny
     */
    public void parseOnly() throws IOException, ParseException {
        if (mPigServer == null) {
            throw new IllegalStateException();
        }

        mDone = false;
        while(!mDone) {
            parse();
        }
    }

    @Override
    protected void processDescribe(String alias) throws IOException {
        String nestedAlias = null;
        if(mExplain == null) { // process only if not in "explain" mode

            if (mPigServer.isBatchOn()) {
                mPigServer.parseAndBuild();
            }

            if(alias==null) {
                alias = mPigServer.getPigContext().getLastAlias();
                // if describe is used immediately after launching grunt shell then
                // last defined alias will be null
                if (alias == null) {
                    throw new IOException(
                            "No previously defined alias found. Please define an alias and use 'describe' operator.");
                }
            }
            if(alias.contains("::")) {
                nestedAlias = alias.substring(alias.indexOf("::") + 2);
                alias = alias.substring(0, alias.indexOf("::"));
                mPigServer.dumpSchemaNested(alias, nestedAlias);
            }
            else {
                if ("@".equals(alias)) {
                    alias = mPigServer.getLastRel();
                }
                mPigServer.dumpSchema(alias);
            }
        } else {
            log.warn("'describe' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processExplain(String alias, String script, boolean isVerbose,
                                  String format, String target,
                                  List<String> params, List<String> files)
    throws IOException, ParseException {
        if (mPigServer.isBatchOn()) {
            mPigServer.parseAndBuild();
        }
        if (alias == null && script == null) {
            if (mInteractive) {
                alias = mPigServer.getPigContext().getLastAlias();
                // if explain is used immediately after launching grunt shell then
                // last defined alias will be null
                if (alias == null) {
                    throw new ParseException("'explain' statement must be on an alias or on a script.");
                }
            }
        }
        if ("@".equals(alias)) {
            alias = mPigServer.getLastRel();
        }
        processExplain(alias, script, isVerbose, format, target, params, files,
                false);
    }

    protected void processExplain(String alias, String script, boolean isVerbose,
                                  String format, String target,
                                  List<String> params, List<String> files,
                                  boolean dontPrintOutput)
        throws IOException, ParseException {
        filter.validate(PigCommandFilter.Command.EXPLAIN);
        if (null != mExplain) {
            return;
        }

        try {
            mExplain = new ExplainState(alias, target, script, isVerbose, format);

            if (script != null) {
                setBatchOn();
                try {
                    loadScript(script, true, true, false, params, files);
                } catch(IOException e) {
                    discardBatch();
                    throw e;
            } catch (ParseException e) {
                    discardBatch();
                    throw e;
                }
            }

            mExplain.mLast = true;
            explainCurrentBatch(dontPrintOutput);

        } finally {
            if (script != null) {
                discardBatch();
            }
            mExplain = null;
        }
    }

    protected void explainCurrentBatch() throws IOException {
        explainCurrentBatch(false);
    }

    protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException {
        PrintStream lp = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;
        PrintStream ep = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;

        if (!(mExplain.mLast && mExplain.mCount == 0)) {
            if (mPigServer.isBatchEmpty()) {
                return;
            }
        }

        mExplain.mCount++;
        boolean markAsExecuted = (mExplain.mScript != null);

        if (mExplain.mTarget != null) {
            File file = new File(mExplain.mTarget);

            if (file.isDirectory()) {
                String sCount = (mExplain.mLast && mExplain.mCount == 1)?"":"_"+mExplain.mCount;
                String suffix = mExplain.mTime+sCount+"."+mExplain.mFormat;
                lp = new PrintStream(new File(file, "logical_plan-"+suffix));
                mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
                                   mExplain.mVerbose, markAsExecuted, lp, null, file, suffix);
                lp.close();
                ep.close();
            }
            else {
                boolean append = !(mExplain.mCount==1);
                lp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
                mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
                                   mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
                lp.close();
            }
        }
        else {
            mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
                               mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
        }
    }

    @Override
    protected void printAliases() throws IOException {
        if(mExplain == null) { // process only if not in "explain" mode
            mPigServer.printAliases();
        } else {
            log.warn("'aliases' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void printClear() {
        AnsiConsole.systemInstall();
        Ansi ansi = Ansi.ansi();
        System.out.println( ansi.eraseScreen() );
        System.out.println( ansi.cursor(0, 0) );
        AnsiConsole.systemUninstall();
    }

    @Override
    protected void processRegister(String jar) throws IOException {
        filter.validate(PigCommandFilter.Command.REGISTER);
        jar = parameterSubstitutionInGrunt(jar);
        mPigServer.registerJar(jar);
    }

    @Override
    protected void processRegister(String path, String scriptingLang, String namespace) throws IOException, ParseException {
        filter.validate(PigCommandFilter.Command.REGISTER);
        path = parameterSubstitutionInGrunt(path);
        scriptingLang = parameterSubstitutionInGrunt(scriptingLang);
        namespace = parameterSubstitutionInGrunt(namespace);
        new RegisterResolver(mPigServer).parseRegister(path, scriptingLang, namespace);
    }

    private String runPreprocessor(String scriptPath, List<String> params, List<String> paramFiles)
        throws IOException, ParseException {

        PigContext context = mPigServer.getPigContext();
        BufferedReader reader = new BufferedReader(new FileReader(scriptPath));
        String result = context.doParamSubstitution(reader, params, paramFiles);
        reader.close();
        return result;
    }

    @Override
    protected void processScript(String script, boolean batch,
                                 List<String> params, List<String> files)
        throws IOException, ParseException {
        if(batch) {
            filter.validate(PigCommandFilter.Command.EXEC);
        } else {
            filter.validate(PigCommandFilter.Command.RUN);
        }

        if(mExplain == null) { // process only if not in "explain" mode
            if (script == null) {
                executeBatch();
                return;
            }

            if (batch) {
                setBatchOn();
                mPigServer.setJobName(script);
                try {
                    FetchFileRet scriptFile = FileLocalizer.fetchFile(mConf, script);
                    ScriptState.get().beginNestedScript(scriptFile.file);
                    loadScript(scriptFile, script, true, false, mLoadOnly, params, files);
                    executeBatch();
                } finally {
                    ScriptState.get().endNestedScript();
                    discardBatch();
                }
            } else {
                loadScript(script, false, false, mLoadOnly, params, files);
            }
        } else {
            log.warn("'run/exec' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    private void loadScript(String script, boolean batch, boolean loadOnly, boolean illustrate,
            List<String> params, List<String> files) throws IOException, ParseException {
        loadScript(FileLocalizer.fetchFile(mConf, script), script, batch, loadOnly, illustrate, params, files);
    }

    private void loadScript(FetchFileRet fetchFile, String script, boolean batch, boolean loadOnly, boolean illustrate,
                            List<String> params, List<String> files)
        throws IOException, ParseException {

        Reader inputReader;
        ConsoleReader reader;
        boolean interactive;

        PigContext pc = mPigServer.getPigContext();

        if( !loadOnly ) {
          pc.getPreprocessorContext().paramScopePush();
        }
        pc.setParams(params);
        pc.setParamFiles(files);

        try {
            String cmds = runPreprocessor(fetchFile.file.getAbsolutePath(), params, files);

            if (mInteractive && !batch) { // Write prompt and echo commands
                // Console reader treats tabs in a special way
                cmds = cmds.replaceAll("\t","    ");

                reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()),
                                           System.out);
                reader.setHistory(mConsoleReader.getHistory());
                InputStream in = new ConsoleReaderInputStream(reader);
                inputReader = new BufferedReader(new InputStreamReader(in));
                interactive = true;
            } else { // Quietly parse the statements
                inputReader = new StringReader(cmds);
                reader = null;
                interactive = false;
            }
        } catch (FileNotFoundException fnfe) {
            throw new ParseException("File not found: " + script);
        } catch (SecurityException se) {
            throw new ParseException("Cannot access file: " + script);
        }

        GruntParser parser = new GruntParser(inputReader, mPigServer);
        parser.setConsoleReader(reader);
        parser.setInteractive(interactive);
        parser.setLoadOnly(loadOnly);
        if (illustrate)
            parser.setScriptIllustrate();
        parser.mExplain = mExplain;

        parser.prompt();
        while(!parser.isDone()) {
            parser.parse();
        }

        if (interactive) {
            System.out.println("");
        }
        if( ! loadOnly ) {
          pc.getPreprocessorContext().paramScopePop();
        }
    }

    @Override
    protected void processSet(String key, String value) throws IOException, ParseException {
        filter.validate(PigCommandFilter.Command.SET);
        key = parameterSubstitutionInGrunt(key);
        value = parameterSubstitutionInGrunt(value);
        if (key.equals("debug"))
        {
            if (value != null) {
                if (value.equals("on"))
                    mPigServer.debugOn();
                else if (value.equals("off"))
                    mPigServer.debugOff();
                else
                    throw new ParseException("Invalid value " + value + " provided for " + key);
            } else {
                System.out.println(key + "=" + mPigServer.isDebugOn());
            }
        }
        else if (key.equals("job.name"))
        {
            if (value != null) {
                mPigServer.setJobName(value);
            } else {
                System.out.println(key + "=" + mPigServer.getJobName());
            }
        }
        else if (key.equals("job.priority"))
        {
            if (value != null) {
                mPigServer.setJobPriority(value);
            } else {
                System.out.println(key + "=" + mPigServer.getJobPriority());
            }
        }
        else if (key.equals("stream.skippath")) {
            if (value != null) {
                // Validate
                File file = new File(value);
                if (!file.exists() || file.isDirectory()) {
                    throw new IOException("Invalid value for stream.skippath:" +
                                          value);
                }
                mPigServer.addPathToSkip(value);
            } else {
                System.out.println(key + "=" + StringUtils.join(mPigServer.getPigContext().getPathsToSkip(), ","));
            }
        }
        else if (key.equals("default_parallel")) {
            if (value != null) {
                // Validate
                try {
                    mPigServer.setDefaultParallel(Integer.parseInt(value));
                } catch (NumberFormatException e) {
                    throw new ParseException("Invalid value for default_parallel");
                }
            } else {
                System.out.println(key + "=" + mPigServer.getPigContext().getDefaultParallel());
            }
        }
        else
        {
            if (value != null) {
                mPigServer.getPigContext().getExecutionEngine().setProperty(key, value);
            } else {
                if (mPigServer.getPigContext().getProperties().containsKey(key)) {
                    System.out.println(key + "=" + mPigServer.getPigContext().getProperties().getProperty(key));
                } else {
                    System.out.println(key + " is not defined");
                }
            }
        }
    }

    @Override
    protected void processSet() throws IOException, ParseException {
        filter.validate(PigCommandFilter.Command.SET);
        Properties jobProps = mPigServer.getPigContext().getProperties();
        Properties sysProps = System.getProperties();

        List<String> jobPropsList = Lists.newArrayList();
        List<String> sysPropsList = Lists.newArrayList();

        for (Object key : jobProps.keySet()) {
            String propStr = key + "=" + jobProps.getProperty((String) key);
            if (sysProps.containsKey(key)) {
                sysPropsList.add("system: " + propStr);
            } else {
                jobPropsList.add(propStr);
            }
        }
        Collections.sort(jobPropsList);
        Collections.sort(sysPropsList);
        jobPropsList.addAll(sysPropsList);
        for (String prop : jobPropsList) {
            System.out.println(prop);
        }
    }

    @Override
    protected void processCat(String path) throws IOException {
        filter.validate(PigCommandFilter.Command.CAT);
        path = parameterSubstitutionInGrunt(path);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                byte buffer[] = new byte[65536];
                ElementDescriptor dfsPath = mDfs.asElement(path);
                int rc;

                if (!dfsPath.exists())
                    throw new IOException("Directory " + path + " does not exist.");

                if (mDfs.isContainer(path)) {
                    ContainerDescriptor dfsDir = (ContainerDescriptor) dfsPath;
                    Iterator<ElementDescriptor> paths = dfsDir.iterator();

                    while (paths.hasNext()) {
                        ElementDescriptor curElem = paths.next();

                        if (mDfs.isContainer(curElem.toString())) {
                            continue;
                        }

                        InputStream is = curElem.open();
                        while ((rc = is.read(buffer)) > 0) {
                            System.out.write(buffer, 0, rc);
                        }
                        is.close();
                    }
                }
                else {
                    InputStream is = dfsPath.open();
                    while ((rc = is.read(buffer)) > 0) {
                        System.out.write(buffer, 0, rc);
                    }
                    is.close();
                }
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to Cat: " + path, e);
            }
        } else {
            log.warn("'cat' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processCD(String path) throws IOException {
        filter.validate(PigCommandFilter.Command.CD);
        path = parameterSubstitutionInGrunt(path);
        ContainerDescriptor container;
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                if (path == null) {
                    container = mDfs.asContainer(((HDataStorage)mDfs).getHFS().getHomeDirectory().toString());
                    mDfs.setActiveContainer(container);
                }
                else
                {
                    container = mDfs.asContainer(path);

                    if (!container.exists()) {
                        throw new IOException("Directory " + path + " does not exist.");
                    }

                    if (!mDfs.isContainer(path)) {
                        throw new IOException(path + " is not a directory.");
                    }

                    mDfs.setActiveContainer(container);
                }
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to change working directory to " +
                                      ((path == null) ? (((HDataStorage)mDfs).getHFS().getHomeDirectory().toString())
                                                         : (path)), e);
            }
        } else {
            log.warn("'cd' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processDump(String alias) throws IOException {
        filter.validate(PigCommandFilter.Command.DUMP);
        if (alias == null) {
            if (mPigServer.isBatchOn()) {
                mPigServer.parseAndBuild();
            }
            alias = mPigServer.getPigContext().getLastAlias();
            // if dump is used immediately after launching grunt shell then
            // last defined alias will be null
            if (alias == null) {
                throw new IOException(
                        "No previously defined alias found. Please define an alias and use 'dump' operator.");
            }
        }

        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            if ("@".equals(alias)) {
                alias = mPigServer.getLastRel();
            }
            Iterator<Tuple> result = mPigServer.openIterator(alias);
            while (result.hasNext())
            {
                Tuple t = result.next();
                System.out.println(TupleFormat.format(t));
            }
        } else {
            log.warn("'dump' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processIllustrate(String alias, String script, String target, List<String> params, List<String> files) throws IOException, ParseException
    {
        filter.validate(PigCommandFilter.Command.ILLUSTRATE);
        if (mScriptIllustrate)
            throw new ParseException("'illustrate' statement can not appear in a script that is illustrated opon.");

        if ((alias != null) && (script != null))
            throw new ParseException("'illustrate' statement on an alias does not work when a script is in effect");
        else if (mExplain != null)
            log.warn("'illustrate' statement is ignored while processing 'explain -script' or '-check'");
        else {
            try {
                if (script != null) {
                    if (!"true".equalsIgnoreCase(mPigServer.
                                                 getPigContext()
                                                 .getProperties().
                                                 getProperty("opt.multiquery","true"))) {
                        throw new ParseException("Cannot explain script if multiquery is disabled.");
                    }
                    setBatchOn();
                    try {
                        loadScript(script, true, true, true, params, files);
                    } catch(IOException e) {
                        discardBatch();
                        throw e;
                    } catch (ParseException e) {
                        discardBatch();
                        throw e;
                    }
                } else if (alias == null) {
                    if (mPigServer.isBatchOn()) {
                        mPigServer.parseAndBuild();
                    }
                    alias = mPigServer.getPigContext().getLastAlias();
                    // if illustrate is used immediately after launching grunt shell then
                    // last defined alias will be null
                    if (alias == null) {
                        throw new ParseException("'illustrate' statement must be on an alias or on a script.");
                    }
                }
                if ("@".equals(alias)) {
                    if (mPigServer.isBatchOn()) {
                        mPigServer.parseAndBuild();
                    }
                    alias = mPigServer.getLastRel();
                }
                mPigServer.getExamples(alias);
            } finally {
                if (script != null) {
                    discardBatch();
                }
            }
        }
    }

    @Override
    protected void processKill(String jobid) throws IOException
    {
        filter.validate(PigCommandFilter.Command.KILL);
        mPigServer.getPigContext().getExecutionEngine().killJob(jobid);
    }

    @Override
    protected void processLS(String path) throws IOException {
        filter.validate(PigCommandFilter.Command.LS);
        path = parameterSubstitutionInGrunt(path);

        if (mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                ElementDescriptor pathDescriptor;

                if (path == null) {
                    pathDescriptor = mDfs.getActiveContainer();
                }
                else {
                    pathDescriptor = mDfs.asElement(path);
                }

                if (!pathDescriptor.exists()) {
                    throw new IOException("File or directory " + path + " does not exist.");
                }

                if (mDfs.isContainer(pathDescriptor.toString())) {
                    ContainerDescriptor container = (ContainerDescriptor) pathDescriptor;
                    Iterator<ElementDescriptor> elems = container.iterator();

                    while (elems.hasNext()) {
                        ElementDescriptor curElem = elems.next();

                        if (mDfs.isContainer(curElem.toString())) {
                               System.out.println(curElem.toString() + "\t<dir>");
                        } else {
                            printLengthAndReplication(curElem);
                        }
                    }
                } else {
                    printLengthAndReplication(pathDescriptor);
                }
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to LS on " + path, e);
            }
        } else {
            log.warn("'ls' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    private void printLengthAndReplication(ElementDescriptor elem)
            throws IOException {
        Map<String, Object> stats = elem.getStatistics();

        long replication = (Short) stats
                .get(ElementDescriptor.BLOCK_REPLICATION_KEY);
        long len = (Long) stats.get(ElementDescriptor.LENGTH_KEY);

        System.out.println(elem.toString() + "<r " + replication + ">\t" + len);
    }

    @Override
    protected void processPWD() throws IOException
    {
        filter.validate(PigCommandFilter.Command.PWD);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            System.out.println(mDfs.getActiveContainer().toString());
        } else {
            log.warn("'pwd' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processHistory(boolean withNumbers) {
        mPigServer.printHistory(withNumbers);
    }

    @Override
    protected void printHelp()
    {
        System.out.println("Commands:");
        System.out.println("<pig latin statement>; - See the PigLatin manual for details: http://hadoop.apache.org/pig");
        System.out.println("File system commands:");
        System.out.println("    fs <fs arguments> - Equivalent to Hadoop dfs command: http://hadoop.apache.org/common/docs/current/hdfs_shell.html");
        System.out.println("Diagnostic commands:");
        System.out.println("    describe <alias>[::<alias] - Show the schema for the alias. Inner aliases can be described as A::B.");
        System.out.println("    explain [-script <pigscript>] [-out <path>] [-brief] [-dot|-xml] [-param <param_name>=<param_value>]");
        System.out.println("        [-param_file <file_name>] [<alias>] - Show the execution plan to compute the alias or for entire script.");
        System.out.println("        -script - Explain the entire script.");
        System.out.println("        -out - Store the output into directory rather than print to stdout.");
        System.out.println("        -brief - Don't expand nested plans (presenting a smaller graph for overview).");
        System.out.println("        -dot - Generate the output in .dot format. Default is text format.");
        System.out.println("        -xml - Generate the output in .xml format. Default is text format.");
        System.out.println("        -param <param_name - See parameter substitution for details.");
        System.out.println("        -param_file <file_name> - See parameter substitution for details.");
        System.out.println("        alias - Alias to explain.");
        System.out.println("    dump <alias> - Compute the alias and writes the results to stdout.");
        System.out.println("Utility Commands:");
        System.out.println("    exec [-param <param_name>=param_value] [-param_file <file_name>] <script> - ");
        System.out.println("        Execute the script with access to grunt environment including aliases.");
        System.out.println("        -param <param_name - See parameter substitution for details.");
        System.out.println("        -param_file <file_name> - See parameter substitution for details.");
        System.out.println("        script - Script to be executed.");
        System.out.println("    run [-param <param_name>=param_value] [-param_file <file_name>] <script> - ");
        System.out.println("        Execute the script with access to grunt environment. ");
        System.out.println("        -param <param_name - See parameter substitution for details.");
        System.out.println("        -param_file <file_name> - See parameter substitution for details.");
        System.out.println("        script - Script to be executed.");
        System.out.println("    sh  <shell command> - Invoke a shell command.");
        System.out.println("    kill <job_id> - Kill the hadoop job specified by the hadoop job id.");
        System.out.println("    set <key> <value> - Provide execution parameters to Pig. Keys and values are case sensitive.");
        System.out.println("        The following keys are supported: ");
        System.out.println("        default_parallel - Script-level reduce parallelism. Basic input size heuristics used by default.");
        System.out.println("        debug - Set debug on or off. Default is off.");
        System.out.println("        job.name - Single-quoted name for jobs. Default is PigLatin:<script name>");
        System.out.println("        job.priority - Priority for jobs. Values: very_low, low, normal, high, very_high. Default is normal");
        System.out.println("        stream.skippath - String that contains the path. This is used by streaming.");
        System.out.println("        any hadoop property.");
        System.out.println("    help - Display this message.");
        System.out.println("    history [-n] - Display the list statements in cache.");
        System.out.println("        -n Hide line numbers. ");
        System.out.println("    quit - Quit the grunt shell.");
    }

    @Override
    protected void processMove(String src, String dst) throws IOException
    {
        filter.validate(PigCommandFilter.Command.MV);
        src = parameterSubstitutionInGrunt(src);
        dst = parameterSubstitutionInGrunt(dst);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                ElementDescriptor srcPath = mDfs.asElement(src);
                ElementDescriptor dstPath = mDfs.asElement(dst);

                if (!srcPath.exists()) {
                    throw new IOException("File or directory " + src + " does not exist.");
                }

                srcPath.rename(dstPath);
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to move " + src + " to " + dst, e);
            }
        } else {
            log.warn("'mv' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processCopy(String src, String dst) throws IOException
    {
        filter.validate(PigCommandFilter.Command.CP);
        src = parameterSubstitutionInGrunt(src);
        dst = parameterSubstitutionInGrunt(dst);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                ElementDescriptor srcPath = mDfs.asElement(src);
                ElementDescriptor dstPath = mDfs.asElement(dst);

                srcPath.copy(dstPath, mConf, false);
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to copy " + src + " to " + dst, e);
            }
        } else {
            log.warn("'cp' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processCopyToLocal(String src, String dst) throws IOException
    {
        filter.validate(PigCommandFilter.Command.COPYTOLOCAL);
        src = parameterSubstitutionInGrunt(src);
        dst = parameterSubstitutionInGrunt(dst);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                ElementDescriptor srcPath = mDfs.asElement(src);
                ElementDescriptor dstPath = mLfs.asElement(dst);

                srcPath.copy(dstPath, false);
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to copy " + src + "to (locally) " + dst, e);
            }
        } else {
            log.warn("'copyToLocal' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processCopyFromLocal(String src, String dst) throws IOException
    {
        filter.validate(PigCommandFilter.Command.COPYFROMLOCAL);
        src = parameterSubstitutionInGrunt(src);
        dst = parameterSubstitutionInGrunt(dst);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            try {
                ElementDescriptor srcPath = mLfs.asElement(src);
                ElementDescriptor dstPath = mDfs.asElement(dst);

                srcPath.copy(dstPath, false);
            }
            catch (DataStorageException e) {
                throw new IOException("Failed to copy (loally) " + src + "to " + dst, e);
            }
        } else {
            log.warn("'copyFromLocal' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processMkdir(String dir) throws IOException
    {
        filter.validate(PigCommandFilter.Command.MKDIR);
        dir = parameterSubstitutionInGrunt(dir);
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            ContainerDescriptor dirDescriptor = mDfs.asContainer(dir);
            dirDescriptor.create();
        } else {
            log.warn("'mkdir' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processPig(String cmd) throws IOException
    {
        int start = 1;
        cmd = parameterSubstitutionInGrunt(cmd);
        if (!mInteractive) {
            start = getLineNumber();
        }

        if (cmd.charAt(cmd.length() - 1) != ';') {
            mPigServer.registerQuery(cmd + ";", start);
        }
        else {
            mPigServer.registerQuery(cmd, start);
        }
    }

    @Override
    protected void processRemove(String path, String options) throws IOException {
        filter.validate(PigCommandFilter.Command.RM);
        filter.validate(PigCommandFilter.Command.RMF);
        path = parameterSubstitutionInGrunt(path);
        int MAX_MS_TO_WAIT_FOR_FILE_DELETION = 10 * 60 * 1000;
        int MS_TO_SLEEP_WHILE_WAITING_FOR_FILE_DELETION = 250;

        if(mExplain == null) { // process only if not in "explain" mode
            Path filePath = new Path(path);
            ElementDescriptor dfsPath = null;
            FileSystem fs = filePath.getFileSystem(ConfigurationUtil.toConfiguration(mConf));

            executeBatch();
            if (!fs.exists(filePath)) {
                if (options == null || !options.equalsIgnoreCase("force")) {
                    throw new IOException("File or directory " + path + " does not exist.");
                }
            }
            else {
                boolean deleteSuccess = fs.delete(filePath, true);
                if (!deleteSuccess) {
                    log.warn("Unable to delete " + path);
                    return;
                }
                long startTime = System.currentTimeMillis();
                long duration = 0;
                while(fs.exists(filePath)) {
                    duration = System.currentTimeMillis() - startTime;
                    if (duration > MAX_MS_TO_WAIT_FOR_FILE_DELETION) {
                        throw new IOException("Timed out waiting to delete file: " + dfsPath);
                    } else {
                        try {
                            Thread.sleep(MS_TO_SLEEP_WHILE_WAITING_FOR_FILE_DELETION);
                        } catch (InterruptedException e) {
                            throw new IOException("Error waiting for file deletion", e);
                        }
                    }
                }
                log.info("Waited " + duration + "ms to delete file");
            }
        } else {
            log.warn("'rm/rmf' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processFsCommand(String[] cmdTokens) throws IOException {
        filter.validate(PigCommandFilter.Command.FS);
        for (int i = 0 ; i < cmdTokens.length ; i++) {
            cmdTokens[i] = parameterSubstitutionInGrunt(cmdTokens[i]);
        }
        if(mExplain == null) { // process only if not in "explain" mode

            executeBatch();

            int retCode = -1;

            try {
                retCode = shell.run(cmdTokens);
            } catch (Exception e) {
                throw new IOException(e);
            }

            if (retCode != 0 && !mInteractive) {
                String s = LoadFunc.join(
                        (AbstractList<String>) Arrays.asList(cmdTokens), " ");
                throw new IOException("fs command '" + s
                        + "' failed. Please check output logs for details");
            }
        } else {
            log.warn("'fs' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processShCommand(String[] cmdTokens) throws IOException {
        filter.validate(PigCommandFilter.Command.SH);
        for (int i = 0 ; i < cmdTokens.length ; i++) {
            cmdTokens[i] = parameterSubstitutionInGrunt(cmdTokens[i]);
        }
        if(mExplain == null) { // process only if not in "explain" mode
            try {
                executeBatch();

                // For sh command, create a process with the following syntax
                // <shell exe> <invoke arg> <command-as-string>
                String  shellName = "sh";
                String  shellInvokeArg = "-c";

                // Insert cmd /C in front of the array list to execute to
                // support built-in shell commands like mkdir on Windows
                if (System.getProperty("os.name").startsWith("Windows")) {
                    shellName      = "cmd";
                    shellInvokeArg = "/C";
                }

                List<String> stringList = new ArrayList<String>();
                stringList.add(shellName);
                stringList.add(shellInvokeArg);

                StringBuffer commandString = new StringBuffer();
                for (String currToken : cmdTokens) {
                    commandString.append(" ");
                    commandString.append(currToken);
                }

                stringList.add(commandString.toString());

                String[] newCmdTokens = stringList.toArray(new String[0]);

                Process executor = Runtime.getRuntime().exec(newCmdTokens);

                StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
                StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);

                outPrinter.start();
                errPrinter.start();

                int ret = executor.waitFor();
                outPrinter.join();
                errPrinter.join();
                if (ret != 0 && !mInteractive) {
                    String s = LoadFunc.join(
                            (AbstractList<String>) Arrays.asList(cmdTokens), " ");
                    throw new IOException("sh command '" + s
                            + "' failed. Please check output logs for details");
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        } else {
            log.warn("'sh' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    public static int runSQLCommand(String hcatBin, String cmd, boolean mInteractive) throws IOException {
        List<String> tokensList = new ArrayList<String>();
		if (hcatBin.endsWith(".py")) {
			tokensList.add("python");
			tokensList.add(hcatBin);
		} else {
			tokensList.add(hcatBin);
		}
		cmd = cmd.trim();
		if (!cmd.substring(0, 3).toLowerCase().equals("sql")) {
		    // Should never happen
		    throw new IOException("sql command not start with sql keyword");
		}
		cmd = cmd.substring(3).trim();
		tokensList.add("-e");
		tokensList.add(cmd.replaceAll("\n", " "));
		String[] tokens = tokensList.toArray(new String[]{});

        // create new environment = environment - HADOOP_CLASSPATH
        // This is because of antlr version conflict between Pig and Hive
        Map<String, String> envs = System.getenv();
        Set<String> envSet = new HashSet<String>();
        for (Map.Entry<String, String> entry : envs.entrySet()) {
            if (!entry.getKey().equals("HADOOP_CLASSPATH")) {
                envSet.add(entry.getKey() + "=" + entry.getValue());
            }
        }

        log.info("Going to run hcat command: " + tokens[tokens.length-1]);
        Process executor = Runtime.getRuntime().exec(tokens, envSet.toArray(new String[0]));
        StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
        StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);

        outPrinter.start();
        errPrinter.start();

        int ret;
        try {
            ret = executor.waitFor();

            outPrinter.join();
            errPrinter.join();
            if (ret != 0 && !mInteractive) {
                throw new IOException("sql command '" + cmd
                        + "' failed. ");
            }
        } catch (InterruptedException e) {
            log.warn("Exception raised from sql command " + e.getLocalizedMessage());
        }
        return 0;
    }

    @Override
    protected void processSQLCommand(String cmd) throws IOException{
        cmd = parameterSubstitutionInGrunt(cmd);
        if(mExplain == null) { // process only if not in "explain" mode
            if (!mPigServer.getPigContext().getProperties().get("pig.sql.type").equals("hcat")) {
                throw new IOException("sql command only support hcat currently");
            }
            String hcatBin = (String)mPigServer.getPigContext().getProperties().get("hcat.bin");
            if (hcatBin == null) {
                throw new IOException("hcat.bin is not defined. Define it to be your hcat script (Usually $HCAT_HOME/bin/hcat");
            }
            if (!(new File(hcatBin).exists())) {
                throw new IOException(hcatBin + " does not exist. Please check your 'hcat.bin' setting in pig.properties.");
            }
            executeBatch();
            runSQLCommand(hcatBin, cmd, mInteractive);
        } else {
            log.warn("'sql' statement is ignored while processing 'explain -script' or '-check'");
        }
    }

    @Override
    protected void processDefault(String key, String value) throws IOException {
        parameterSubstitutionInGrunt("%default " + key + " " + value);
    }

    @Override
    protected void processDeclare(String key, String value) throws IOException {
        parameterSubstitutionInGrunt("%declare " + key + " " + value);
    }

    private String parameterSubstitutionInGrunt(String input) throws IOException {
        if (mInteractive && input != null) {
            return mPigServer.getPigContext().doParamSubstitution(new BufferedReader(new StringReader(input))).trim();
        }
        return input;
    }
    /**
     * StreamPrinter.
     *
     */
    public static class StreamPrinter extends Thread {
        InputStream is;
        String type;
        PrintStream os;

        public StreamPrinter(InputStream is, String type, PrintStream os) {
            this.is = is;
            this.type = type;
            this.os = os;
        }

        @Override
        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(is);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                if (type != null) {
                    while ((line = br.readLine()) != null) {
                        os.println(type + ">" + line);
                    }
                } else {
                    while ((line = br.readLine()) != null) {
                        os.println(line);
                    }
                }
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }
    }

    protected static class ExplainState {
        public long mTime;
        public int mCount;
        public String mAlias;
        public String mTarget;
        public String mScript;
        public boolean mVerbose;
        public String mFormat;
        public boolean mLast;

        public ExplainState(String alias, String target, String script,
                            boolean verbose, String format) {
            mTime = new Date().getTime();
            mCount = 0;
            mAlias = alias;
            mTarget = target;
            mScript = script;
            mVerbose = verbose;
            mFormat = format;
            mLast = false;
        }
    }

    private PigServer mPigServer;
    private DataStorage mDfs;
    private DataStorage mLfs;
    private Properties mConf;
    private boolean mDone;
    private boolean mLoadOnly;
    private ExplainState mExplain;
    private int mNumFailedJobs;
    private int mNumSucceededJobs;
    private FsShell shell;
    private boolean mScriptIllustrate;

    //For Testing Only
    protected void setExplainState(ExplainState explainState) {
        this.mExplain = explainState;
    }

}
