| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * This is helper class for parameter substitution |
| */ |
| |
| package org.apache.pig.tools.parameters; |
| |
| import java.io.BufferedReader; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.StringReader; |
| import java.util.ArrayDeque; |
| import java.util.Deque; |
| import java.util.Hashtable; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.validator.BlackAndWhitelistFilter; |
| import org.apache.pig.validator.PigCommandFilter; |
| |
| public class PreprocessorContext { |
| |
| private int tableinitsize = 10; |
| private Deque<Map<String,String>> param_val_stack; |
| |
| private PigContext pigContext; |
| |
| public Map<String, String> getParamVal() { |
| Map <String, String> ret = new Hashtable <String, String>(tableinitsize); |
| |
| //stack (deque) iterates LIFO |
| for (Map <String, String> map : param_val_stack ) { |
| for (Map.Entry<String, String> entry : map.entrySet()) { |
| if( ! ret.containsKey(entry.getKey()) ) { |
| ret.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| private final Log log = LogFactory.getLog(getClass()); |
| |
| /** |
| * @param limit - max number of parameters. Passing |
| * smaller number only impacts performance |
| */ |
| public PreprocessorContext(int limit) { |
| tableinitsize = limit; |
| param_val_stack = new ArrayDeque<Map<String,String>> (); |
| param_val_stack.push(new Hashtable<String, String> (tableinitsize)); |
| } |
| |
| public void setPigContext(PigContext context) { |
| this.pigContext = context; |
| } |
| |
| /** |
| * This method generates parameter value by running specified command |
| * |
| * @param key - parameter name |
| * @param val - string containing command to be executed |
| */ |
| public void processShellCmd(String key, String val) throws ParameterSubstitutionException, FrontendException { |
| processShellCmd(key, val, true); |
| } |
| |
| /** |
| * This method generates value for the specified key by |
| * performing substitution if needed within the value first. |
| * |
| * @param key - parameter name |
| * @param val - value supplied for the key |
| */ |
| public void processOrdLine(String key, String val) throws ParameterSubstitutionException { |
| processOrdLine(key, val, true); |
| } |
| |
| public void paramScopePush() { |
| param_val_stack.push( new Hashtable<String, String> (tableinitsize) ); |
| } |
| |
| public void paramScopePop() { |
| param_val_stack.pop(); |
| } |
| |
| public boolean paramval_containsKey(String key) { |
| for (Map <String, String> map : param_val_stack ) { |
| if( map.containsKey(key) ) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public String paramval_get(String key) { |
| for (Map <String, String> map : param_val_stack ) { |
| if( map.containsKey(key) ) { |
| return map.get(key); |
| } |
| } |
| return null; |
| } |
| |
| public void paramval_put(String key, String value) { |
| param_val_stack.peek().put(key, value); |
| } |
| |
| /** |
| * This method generates parameter value by running specified command |
| * |
| * @param key - parameter name |
| * @param val - string containing command to be executed |
| */ |
| public void processShellCmd(String key, String val, Boolean overwrite) throws ParameterSubstitutionException, FrontendException { |
| if (pigContext != null) { |
| BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext); |
| filter.validate(PigCommandFilter.Command.SH); |
| } |
| |
| if (paramval_containsKey(key) && !overwrite) { |
| return; |
| } |
| |
| val = val.substring(1, val.length()-1); //to remove the backticks |
| String sub_val = substitute(val); |
| sub_val = executeShellCommand(sub_val); |
| |
| if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) { |
| //(boolean overwrite is always true here) |
| log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. " |
| + "Previous value " + paramval_get(key) + ", now using value " + sub_val); |
| } |
| |
| paramval_put(key, sub_val); |
| } |
| |
| public void validate(String preprocessorCmd) throws FrontendException { |
| if (pigContext == null) { |
| return; |
| } |
| |
| final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext); |
| final String declareToken = "%declare"; |
| final String defaultToken = "%default"; |
| |
| if (preprocessorCmd.toLowerCase().equals(declareToken)) { |
| filter.validate(PigCommandFilter.Command.DECLARE); |
| } else if (preprocessorCmd.toLowerCase().equals(defaultToken)) { |
| filter.validate(PigCommandFilter.Command.DEFAULT); |
| } else { |
| throw new IllegalArgumentException("Pig Internal Error. Invalid preprocessor command specified : " |
| + preprocessorCmd); |
| } |
| } |
| |
| /** |
| * This method generates value for the specified key by |
| * performing substitution if needed within the value first. |
| * |
| * @param key - parameter name |
| * @param val - value supplied for the key |
| * @param overwrite - specifies whether the value should be replaced if it already exists |
| */ |
| public void processOrdLine(String key, String val, Boolean overwrite) throws ParameterSubstitutionException { |
| |
| String sub_val = substitute(val, key); |
| if (paramval_containsKey(key)) { |
| if (paramval_get(key).equals(sub_val) || !overwrite) { |
| return; |
| } else { |
| log.warn("Warning : Multiple values found for " + key |
| + ". Previous value " + paramval_get(key) |
| + ", now using value " + sub_val); |
| } |
| } |
| |
| paramval_put(key, sub_val); |
| } |
| |
| /** |
| * Slurp in an entire input stream and close it. |
| */ |
| public static class CallableStreamReader implements Callable<String> { |
| private final InputStream inputStream; |
| |
| public CallableStreamReader(InputStream stream) { |
| inputStream = stream; |
| } |
| |
| @Override |
| public String call() { |
| try { |
| return IOUtils.toString(inputStream); |
| } catch (IOException e) { |
| throw new RuntimeException("IO Exception while executing shell command: " + e.getMessage() , e); |
| } finally { |
| IOUtils.closeQuietly(inputStream); |
| } |
| } |
| } |
| |
| /* |
| * executes the 'cmd' in shell and returns result |
| */ |
| private String executeShellCommand (String cmd) |
| { |
| Process p; |
| String streamData=""; |
| String streamError=""; |
| try { |
| log.info("Executing command : " + cmd); |
| // we can't use exec directly since it does not handle |
| // case like foo -c "bar bar" correctly. It splits on white spaces even in presents of quotes |
| StringBuffer sb = new StringBuffer(""); |
| String[] cmdArgs; |
| if (Shell.WINDOWS) { |
| cmd = cmd.replaceAll("/", "\\\\"); |
| sb.append(cmd); |
| cmdArgs = new String[]{"cmd", "/c", sb.toString() }; |
| } else { |
| sb.append("exec "); |
| sb.append(cmd); |
| cmdArgs = new String[]{"bash", "-c", sb.toString() }; |
| } |
| |
| p = Runtime.getRuntime().exec(cmdArgs); |
| |
| } catch (IOException e) { |
| RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e); |
| throw rte; |
| } |
| |
| // Read stdout and stderr in separate threads to avoid deadlock due to pipe buffer size |
| ExecutorService executorService = Executors.newFixedThreadPool(2); |
| Future<String> futureOut = executorService.submit(new CallableStreamReader(p.getInputStream())); |
| Future<String> futureErr = executorService.submit(new CallableStreamReader(p.getErrorStream())); |
| |
| try { |
| streamData = futureOut.get(); |
| streamError = futureErr.get(); |
| log.debug("Error stream while executing shell command : " + streamError); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("InterruptedException while executing shell command : " + e.getMessage() , e); |
| } catch (ExecutionException e) { |
| log.warn("Stderr output from command: \"" + cmd + "\" was - " + streamError); |
| throw new RuntimeException("ExecutionException while executing shell command : " + e.getMessage(), e); |
| } finally { |
| executorService.shutdownNow(); |
| } |
| |
| int exitVal; |
| try { |
| exitVal = p.waitFor(); |
| } catch (InterruptedException e) { |
| RuntimeException rte = new RuntimeException("Interrupted Thread Exception while waiting for command to get over"+e.getMessage() , e); |
| throw rte; |
| } |
| |
| if (exitVal != 0) { |
| log.warn("Stderr output from command: \"" + cmd + "\" was - " + streamError); |
| RuntimeException rte = new RuntimeException("Error executing shell command: " + cmd + ". Command exit with exit code of " + exitVal ); |
| throw rte; |
| } |
| |
| return streamData.trim(); |
| } |
| |
| public void loadParamVal(List<String> params, List<String> paramFiles) |
| throws IOException, ParseException { |
| StringReader dummyReader = null; // ParamLoader does not have an empty contructor |
| ParamLoader paramLoader = new ParamLoader(dummyReader); |
| paramLoader.setContext(this); |
| |
| if (paramFiles != null) { |
| for (String path : paramFiles) { |
| BufferedReader in = new BufferedReader(new FileReader(path)); |
| paramLoader.ReInit(in); |
| while (paramLoader.Parse()) {} |
| in.close(); |
| } |
| } |
| |
| if (params != null) { |
| for (String param : params) { |
| paramLoader.ReInit(new StringReader(param)); |
| paramLoader.Parse(); |
| } |
| } |
| } |
| |
| private Pattern bracketIdPattern = Pattern.compile("\\$\\{([_]*[a-zA-Z][a-zA-Z_0-9]*)\\}"); |
| private Pattern id_pattern = Pattern.compile("\\$([_]*[a-zA-Z][a-zA-Z_0-9]*)"); |
| |
| public String substitute(String line) throws ParameterSubstitutionException { |
| return substitute(line, null); |
| } |
| |
| public String substitute(String line, String parentKey) throws ParameterSubstitutionException { |
| int index = line.indexOf('$'); |
| if (index == -1) |
| return line; |
| |
| String replaced_line = line; |
| |
| Matcher bracketKeyMatcher = bracketIdPattern.matcher(line); |
| |
| String key=""; |
| String val=""; |
| |
| while (bracketKeyMatcher.find()) { |
| if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) { |
| key = bracketKeyMatcher.group(1); |
| if (!(paramval_containsKey(key))) { |
| String message; |
| if (parentKey == null) { |
| message = "Undefined parameter : " + key; |
| } else { |
| message = "Undefined parameter : " + key + " found when trying to find the value of " + parentKey + "."; |
| } |
| throw new ParameterSubstitutionException(message); |
| } |
| val = paramval_get(key); |
| val = Matcher.quoteReplacement(val); |
| replaced_line = replaced_line.replaceFirst("\\$\\{"+key+"\\}", val); |
| } |
| } |
| |
| Matcher keyMatcher = id_pattern.matcher( replaced_line ); |
| |
| key=""; |
| val=""; |
| |
| while (keyMatcher.find()) { |
| // make sure that we don't perform parameter substitution |
| // for escaped vars of the form \$<id> |
| if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) { |
| key = keyMatcher.group(1); |
| if (!(paramval_containsKey(key))) { |
| String message; |
| if (parentKey == null) { |
| message = "Undefined parameter : " + key; |
| } else { |
| message = "Undefined parameter : " + key + " found when trying to find the value of " + parentKey + "."; |
| } |
| throw new ParameterSubstitutionException(message); |
| } |
| val = paramval_get(key); |
| val = Matcher.quoteReplacement(val); |
| replaced_line = replaced_line.replaceFirst("\\$"+key, val); |
| } |
| } |
| |
| // unescape $<id> |
| replaced_line = replaced_line.replaceAll("\\\\\\$","\\$"); |
| return replaced_line; |
| } |
| |
| } |
| |
| |