| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.java.utils; |
| |
| import org.apache.flink.annotation.Public; |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.util.Preconditions; |
| |
| import org.apache.commons.lang3.math.NumberUtils; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.ObjectInputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** |
| * This class provides simple utility methods for reading and parsing program arguments from different sources. |
| */ |
| @Public |
| public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable { |
| private static final long serialVersionUID = 1L; |
| |
| protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY"; |
| protected static final String DEFAULT_UNDEFINED = "<undefined>"; |
| |
| // ------------------ Constructors ------------------------ |
| |
| /** |
| * Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values. |
| * Keys have to start with '-' or '--' |
| * |
| * <p><strong>Example arguments:</strong> |
| * --key1 value1 --key2 value2 -key3 value3 |
| * |
| * @param args Input array arguments |
| * @return A {@link ParameterTool} |
| */ |
| public static ParameterTool fromArgs(String[] args) { |
| Map<String, String> map = new HashMap<String, String>(args.length / 2); |
| |
| String key = null; |
| String value = null; |
| boolean expectValue = false; |
| for (String arg : args) { |
| // check for -- argument |
| if (arg.startsWith("--")) { |
| if (expectValue) { |
| // we got into a new key, even though we were a value --> current key is one without value |
| if (value != null) { |
| throw new IllegalStateException("Unexpected state"); |
| } |
| map.put(key, NO_VALUE_KEY); |
| // key will be overwritten in the next step |
| } |
| key = arg.substring(2); |
| expectValue = true; |
| } // check for - argument |
| else if (arg.startsWith("-")) { |
| // we are waiting for a value, so this is a - prefixed value (negative number) |
| if (expectValue) { |
| |
| if (NumberUtils.isNumber(arg)) { |
| // negative number |
| value = arg; |
| expectValue = false; |
| } else { |
| if (value != null) { |
| throw new IllegalStateException("Unexpected state"); |
| } |
| // We waited for a value but found a new key. So the previous key doesnt have a value. |
| map.put(key, NO_VALUE_KEY); |
| key = arg.substring(1); |
| expectValue = true; |
| } |
| } else { |
| // we are not waiting for a value, so its an argument |
| key = arg.substring(1); |
| expectValue = true; |
| } |
| } else { |
| if (expectValue) { |
| value = arg; |
| expectValue = false; |
| } else { |
| throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); |
| } |
| } |
| |
| if (value == null && key == null) { |
| throw new IllegalStateException("Value and key can not be null at the same time"); |
| } |
| if (key != null && value == null && !expectValue) { |
| throw new IllegalStateException("Value expected but flag not set"); |
| } |
| if (key != null && value != null) { |
| map.put(key, value); |
| key = null; |
| value = null; |
| expectValue = false; |
| } |
| if (key != null && key.length() == 0) { |
| throw new IllegalArgumentException("The input " + Arrays.toString(args) + " contains an empty argument"); |
| } |
| |
| if (key != null && !expectValue) { |
| map.put(key, NO_VALUE_KEY); |
| key = null; |
| expectValue = false; |
| } |
| } |
| if (key != null) { |
| map.put(key, NO_VALUE_KEY); |
| } |
| |
| return fromMap(map); |
| } |
| |
| /** |
| * Returns {@link ParameterTool} for the given {@link Properties} file. |
| * |
| * @param path Path to the properties file |
| * @return A {@link ParameterTool} |
| * @throws IOException If the file does not exist |
| * @see Properties |
| */ |
| public static ParameterTool fromPropertiesFile(String path) throws IOException { |
| File propertiesFile = new File(path); |
| return fromPropertiesFile(propertiesFile); |
| } |
| |
| /** |
| * Returns {@link ParameterTool} for the given {@link Properties} file. |
| * |
| * @param file File object to the properties file |
| * @return A {@link ParameterTool} |
| * @throws IOException If the file does not exist |
| * @see Properties |
| */ |
| public static ParameterTool fromPropertiesFile(File file) throws IOException { |
| if (!file.exists()) { |
| throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist"); |
| } |
| try (FileInputStream fis = new FileInputStream(file)) { |
| return fromPropertiesFile(fis); |
| } |
| } |
| |
| /** |
| * Returns {@link ParameterTool} for the given InputStream from {@link Properties} file. |
| * |
| * @param inputStream InputStream from the properties file |
| * @return A {@link ParameterTool} |
| * @throws IOException If the file does not exist |
| * @see Properties |
| */ |
| public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException { |
| Properties props = new Properties(); |
| props.load(inputStream); |
| return fromMap((Map) props); |
| } |
| |
| /** |
| * Returns {@link ParameterTool} for the given map. |
| * |
| * @param map A map of arguments. Both Key and Value have to be Strings |
| * @return A {@link ParameterTool} |
| */ |
| public static ParameterTool fromMap(Map<String, String> map) { |
| Preconditions.checkNotNull(map, "Unable to initialize from empty map"); |
| return new ParameterTool(map); |
| } |
| |
| /** |
| * Returns {@link ParameterTool} from the system properties. |
| * Example on how to pass system properties: |
| * -Dkey1=value1 -Dkey2=value2 |
| * |
| * @return A {@link ParameterTool} |
| */ |
| public static ParameterTool fromSystemProperties() { |
| return fromMap((Map) System.getProperties()); |
| } |
| |
| // ------------------ ParameterUtil ------------------------ |
| protected final Map<String, String> data; |
| |
| // data which is only used on the client and does not need to be transmitted |
| protected transient Map<String, String> defaultData; |
| protected transient Set<String> unrequestedParameters; |
| |
| private ParameterTool(Map<String, String> data) { |
| this.data = Collections.unmodifiableMap(new HashMap<>(data)); |
| |
| this.defaultData = new ConcurrentHashMap<>(data.size()); |
| |
| this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); |
| |
| unrequestedParameters.addAll(data.keySet()); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| ParameterTool that = (ParameterTool) o; |
| return Objects.equals(data, that.data) && |
| Objects.equals(defaultData, that.defaultData) && |
| Objects.equals(unrequestedParameters, that.unrequestedParameters); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(data, defaultData, unrequestedParameters); |
| } |
| |
| /** |
| * Returns the set of parameter names which have not been requested with |
| * {@link #has(String)} or one of the {@code get} methods. Access to the |
| * map returned by {@link #toMap()} is not tracked. |
| */ |
| @PublicEvolving |
| public Set<String> getUnrequestedParameters() { |
| return Collections.unmodifiableSet(unrequestedParameters); |
| } |
| |
| // ------------------ Get data from the util ---------------- |
| |
| /** |
| * Returns number of parameters in {@link ParameterTool}. |
| */ |
| public int getNumberOfParameters() { |
| return data.size(); |
| } |
| |
| /** |
| * Returns the String value for the given key. |
| * If the key does not exist it will return null. |
| */ |
| public String get(String key) { |
| addToDefaults(key, null); |
| unrequestedParameters.remove(key); |
| return data.get(key); |
| } |
| |
| /** |
| * Returns the String value for the given key. |
| * If the key does not exist it will throw a {@link RuntimeException}. |
| */ |
| public String getRequired(String key) { |
| addToDefaults(key, null); |
| String value = get(key); |
| if (value == null) { |
| throw new RuntimeException("No data for required key '" + key + "'"); |
| } |
| return value; |
| } |
| |
| /** |
| * Returns the String value for the given key. |
| * If the key does not exist it will return the given default value. |
| */ |
| public String get(String key, String defaultValue) { |
| addToDefaults(key, defaultValue); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return value; |
| } |
| } |
| |
| /** |
| * Check if value is set. |
| */ |
| public boolean has(String value) { |
| addToDefaults(value, null); |
| unrequestedParameters.remove(value); |
| return data.containsKey(value); |
| } |
| |
| // -------------- Integer |
| |
| /** |
| * Returns the Integer value for the given key. |
| * The method fails if the key does not exist or the value is not an Integer. |
| */ |
| public int getInt(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Integer.parseInt(value); |
| } |
| |
| /** |
| * Returns the Integer value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not an Integer. |
| */ |
| public int getInt(String key, int defaultValue) { |
| addToDefaults(key, Integer.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } |
| return Integer.parseInt(value); |
| } |
| |
| // -------------- LONG |
| |
| /** |
| * Returns the Long value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public long getLong(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Long.parseLong(value); |
| } |
| |
| /** |
| * Returns the Long value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not a Long. |
| */ |
| public long getLong(String key, long defaultValue) { |
| addToDefaults(key, Long.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } |
| return Long.parseLong(value); |
| } |
| |
| // -------------- FLOAT |
| |
| /** |
| * Returns the Float value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public float getFloat(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Float.valueOf(value); |
| } |
| |
| /** |
| * Returns the Float value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not a Float. |
| */ |
| public float getFloat(String key, float defaultValue) { |
| addToDefaults(key, Float.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return Float.valueOf(value); |
| } |
| } |
| |
| // -------------- DOUBLE |
| |
| /** |
| * Returns the Double value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public double getDouble(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Double.valueOf(value); |
| } |
| |
| /** |
| * Returns the Double value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not a Double. |
| */ |
| public double getDouble(String key, double defaultValue) { |
| addToDefaults(key, Double.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return Double.valueOf(value); |
| } |
| } |
| |
| // -------------- BOOLEAN |
| |
| /** |
| * Returns the Boolean value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public boolean getBoolean(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Boolean.valueOf(value); |
| } |
| |
| /** |
| * Returns the Boolean value for the given key. If the key does not exists it will return the default value given. |
| * The method returns whether the string of the value is "true" ignoring cases. |
| */ |
| public boolean getBoolean(String key, boolean defaultValue) { |
| addToDefaults(key, Boolean.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return Boolean.valueOf(value); |
| } |
| } |
| |
| // -------------- SHORT |
| |
| /** |
| * Returns the Short value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public short getShort(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Short.valueOf(value); |
| } |
| |
| /** |
| * Returns the Short value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not a Short. |
| */ |
| public short getShort(String key, short defaultValue) { |
| addToDefaults(key, Short.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return Short.valueOf(value); |
| } |
| } |
| |
| // -------------- BYTE |
| |
| /** |
| * Returns the Byte value for the given key. |
| * The method fails if the key does not exist. |
| */ |
| public byte getByte(String key) { |
| addToDefaults(key, null); |
| String value = getRequired(key); |
| return Byte.valueOf(value); |
| } |
| |
| /** |
| * Returns the Byte value for the given key. If the key does not exists it will return the default value given. |
| * The method fails if the value is not a Byte. |
| */ |
| public byte getByte(String key, byte defaultValue) { |
| addToDefaults(key, Byte.toString(defaultValue)); |
| String value = get(key); |
| if (value == null) { |
| return defaultValue; |
| } else { |
| return Byte.valueOf(value); |
| } |
| } |
| |
| // --------------- Internals |
| |
| protected void addToDefaults(String key, String value) { |
| String currentValue = defaultData.get(key); |
| if (currentValue == null) { |
| if (value == null) { |
| value = DEFAULT_UNDEFINED; |
| } |
| defaultData.put(key, value); |
| } else { |
| // there is already an entry for this key. Check if the value is the undefined |
| if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) { |
| // update key with better default value |
| defaultData.put(key, value); |
| } |
| } |
| } |
| |
| // ------------------------- Export to different targets ------------------------- |
| |
| /** |
| * Returns a {@link Configuration} object from this {@link ParameterTool}. |
| * |
| * @return A {@link Configuration} |
| */ |
| public Configuration getConfiguration() { |
| Configuration conf = new Configuration(); |
| for (Map.Entry<String, String> entry : data.entrySet()) { |
| conf.setString(entry.getKey(), entry.getValue()); |
| } |
| return conf; |
| } |
| |
| /** |
| * Returns a {@link Properties} object from this {@link ParameterTool}. |
| * |
| * @return A {@link Properties} |
| */ |
| public Properties getProperties() { |
| Properties props = new Properties(); |
| props.putAll(this.data); |
| return props; |
| } |
| |
| /** |
| * Create a properties file with all the known parameters (call after the last get*() call). |
| * Set the default value, if available. |
| * |
| * <p>Use this method to create a properties file skeleton. |
| * |
| * @param pathToFile Location of the default properties file. |
| */ |
| public void createPropertiesFile(String pathToFile) throws IOException { |
| createPropertiesFile(pathToFile, true); |
| } |
| |
| /** |
| * Create a properties file with all the known parameters (call after the last get*() call). |
| * Set the default value, if overwrite is true. |
| * |
| * @param pathToFile Location of the default properties file. |
| * @param overwrite Boolean flag indicating whether or not to overwrite the file |
| * @throws IOException If overwrite is not allowed and the file exists |
| */ |
| public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException { |
| File file = new File(pathToFile); |
| if (file.exists()) { |
| if (overwrite) { |
| file.delete(); |
| } else { |
| throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed"); |
| } |
| } |
| Properties defaultProps = new Properties(); |
| defaultProps.putAll(this.defaultData); |
| try (final OutputStream out = new FileOutputStream(file)) { |
| defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()"); |
| } |
| } |
| |
| @Override |
| protected Object clone() throws CloneNotSupportedException { |
| return new ParameterTool(this.data); |
| } |
| |
| // ------------------------- Interaction with other ParameterUtils ------------------------- |
| |
| /** |
| * Merges two {@link ParameterTool}. |
| * |
| * @param other Other {@link ParameterTool} object |
| * @return The Merged {@link ParameterTool} |
| */ |
| public ParameterTool mergeWith(ParameterTool other) { |
| Map<String, String> resultData = new HashMap<>(data.size() + other.data.size()); |
| resultData.putAll(data); |
| resultData.putAll(other.data); |
| |
| ParameterTool ret = new ParameterTool(resultData); |
| |
| final HashSet<String> requestedParametersLeft = new HashSet<>(data.keySet()); |
| requestedParametersLeft.removeAll(unrequestedParameters); |
| |
| final HashSet<String> requestedParametersRight = new HashSet<>(other.data.keySet()); |
| requestedParametersRight.removeAll(other.unrequestedParameters); |
| |
| ret.unrequestedParameters.removeAll(requestedParametersLeft); |
| ret.unrequestedParameters.removeAll(requestedParametersRight); |
| |
| return ret; |
| } |
| |
| // ------------------------- ExecutionConfig.UserConfig interface ------------------------- |
| |
| @Override |
| public Map<String, String> toMap() { |
| return data; |
| } |
| |
| // ------------------------- Serialization --------------------------------------------- |
| |
| private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { |
| in.defaultReadObject(); |
| |
| defaultData = new ConcurrentHashMap<>(data.size()); |
| unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); |
| } |
| } |