blob: 7518fa08faca19afd90c8b07fe49ca61f5ca8234 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.utils;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* This class provides simple utility methods for reading and parsing program arguments from different sources.
*/
@Public
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
protected static final String DEFAULT_UNDEFINED = "<undefined>";
// ------------------ Constructors ------------------------
/**
* Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values.
* Keys have to start with '-' or '--'
*
* <p><strong>Example arguments:</strong>
* --key1 value1 --key2 value2 -key3 value3
*
* @param args Input array arguments
* @return A {@link ParameterTool}
*/
public static ParameterTool fromArgs(String[] args) {
Map<String, String> map = new HashMap<String, String>(args.length / 2);
String key = null;
String value = null;
boolean expectValue = false;
for (String arg : args) {
// check for -- argument
if (arg.startsWith("--")) {
if (expectValue) {
// we got into a new key, even though we were a value --> current key is one without value
if (value != null) {
throw new IllegalStateException("Unexpected state");
}
map.put(key, NO_VALUE_KEY);
// key will be overwritten in the next step
}
key = arg.substring(2);
expectValue = true;
} // check for - argument
else if (arg.startsWith("-")) {
// we are waiting for a value, so this is a - prefixed value (negative number)
if (expectValue) {
if (NumberUtils.isNumber(arg)) {
// negative number
value = arg;
expectValue = false;
} else {
if (value != null) {
throw new IllegalStateException("Unexpected state");
}
// We waited for a value but found a new key. So the previous key doesnt have a value.
map.put(key, NO_VALUE_KEY);
key = arg.substring(1);
expectValue = true;
}
} else {
// we are not waiting for a value, so its an argument
key = arg.substring(1);
expectValue = true;
}
} else {
if (expectValue) {
value = arg;
expectValue = false;
} else {
throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -.");
}
}
if (value == null && key == null) {
throw new IllegalStateException("Value and key can not be null at the same time");
}
if (key != null && value == null && !expectValue) {
throw new IllegalStateException("Value expected but flag not set");
}
if (key != null && value != null) {
map.put(key, value);
key = null;
value = null;
expectValue = false;
}
if (key != null && key.length() == 0) {
throw new IllegalArgumentException("The input " + Arrays.toString(args) + " contains an empty argument");
}
if (key != null && !expectValue) {
map.put(key, NO_VALUE_KEY);
key = null;
expectValue = false;
}
}
if (key != null) {
map.put(key, NO_VALUE_KEY);
}
return fromMap(map);
}
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param path Path to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
return fromPropertiesFile(propertiesFile);
}
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param file File object to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(File file) throws IOException {
if (!file.exists()) {
throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
}
try (FileInputStream fis = new FileInputStream(file)) {
return fromPropertiesFile(fis);
}
}
/**
* Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
*
* @param inputStream InputStream from the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
Properties props = new Properties();
props.load(inputStream);
return fromMap((Map) props);
}
/**
* Returns {@link ParameterTool} for the given map.
*
* @param map A map of arguments. Both Key and Value have to be Strings
* @return A {@link ParameterTool}
*/
public static ParameterTool fromMap(Map<String, String> map) {
Preconditions.checkNotNull(map, "Unable to initialize from empty map");
return new ParameterTool(map);
}
/**
* Returns {@link ParameterTool} from the system properties.
* Example on how to pass system properties:
* -Dkey1=value1 -Dkey2=value2
*
* @return A {@link ParameterTool}
*/
public static ParameterTool fromSystemProperties() {
return fromMap((Map) System.getProperties());
}
// ------------------ ParameterUtil ------------------------
protected final Map<String, String> data;
// data which is only used on the client and does not need to be transmitted
protected transient Map<String, String> defaultData;
protected transient Set<String> unrequestedParameters;
private ParameterTool(Map<String, String> data) {
this.data = Collections.unmodifiableMap(new HashMap<>(data));
this.defaultData = new ConcurrentHashMap<>(data.size());
this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
unrequestedParameters.addAll(data.keySet());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParameterTool that = (ParameterTool) o;
return Objects.equals(data, that.data) &&
Objects.equals(defaultData, that.defaultData) &&
Objects.equals(unrequestedParameters, that.unrequestedParameters);
}
@Override
public int hashCode() {
return Objects.hash(data, defaultData, unrequestedParameters);
}
/**
* Returns the set of parameter names which have not been requested with
* {@link #has(String)} or one of the {@code get} methods. Access to the
* map returned by {@link #toMap()} is not tracked.
*/
@PublicEvolving
public Set<String> getUnrequestedParameters() {
return Collections.unmodifiableSet(unrequestedParameters);
}
// ------------------ Get data from the util ----------------
/**
* Returns number of parameters in {@link ParameterTool}.
*/
public int getNumberOfParameters() {
return data.size();
}
/**
* Returns the String value for the given key.
* If the key does not exist it will return null.
*/
public String get(String key) {
addToDefaults(key, null);
unrequestedParameters.remove(key);
return data.get(key);
}
/**
* Returns the String value for the given key.
* If the key does not exist it will throw a {@link RuntimeException}.
*/
public String getRequired(String key) {
addToDefaults(key, null);
String value = get(key);
if (value == null) {
throw new RuntimeException("No data for required key '" + key + "'");
}
return value;
}
/**
* Returns the String value for the given key.
* If the key does not exist it will return the given default value.
*/
public String get(String key, String defaultValue) {
addToDefaults(key, defaultValue);
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
/**
* Check if value is set.
*/
public boolean has(String value) {
addToDefaults(value, null);
unrequestedParameters.remove(value);
return data.containsKey(value);
}
// -------------- Integer
/**
* Returns the Integer value for the given key.
* The method fails if the key does not exist or the value is not an Integer.
*/
public int getInt(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Integer.parseInt(value);
}
/**
* Returns the Integer value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not an Integer.
*/
public int getInt(String key, int defaultValue) {
addToDefaults(key, Integer.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Integer.parseInt(value);
}
// -------------- LONG
/**
* Returns the Long value for the given key.
* The method fails if the key does not exist.
*/
public long getLong(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Long.parseLong(value);
}
/**
* Returns the Long value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Long.
*/
public long getLong(String key, long defaultValue) {
addToDefaults(key, Long.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Long.parseLong(value);
}
// -------------- FLOAT
/**
* Returns the Float value for the given key.
* The method fails if the key does not exist.
*/
public float getFloat(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Float.valueOf(value);
}
/**
* Returns the Float value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Float.
*/
public float getFloat(String key, float defaultValue) {
addToDefaults(key, Float.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Float.valueOf(value);
}
}
// -------------- DOUBLE
/**
* Returns the Double value for the given key.
* The method fails if the key does not exist.
*/
public double getDouble(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Double.valueOf(value);
}
/**
* Returns the Double value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Double.
*/
public double getDouble(String key, double defaultValue) {
addToDefaults(key, Double.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Double.valueOf(value);
}
}
// -------------- BOOLEAN
/**
* Returns the Boolean value for the given key.
* The method fails if the key does not exist.
*/
public boolean getBoolean(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Boolean.valueOf(value);
}
/**
* Returns the Boolean value for the given key. If the key does not exists it will return the default value given.
* The method returns whether the string of the value is "true" ignoring cases.
*/
public boolean getBoolean(String key, boolean defaultValue) {
addToDefaults(key, Boolean.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Boolean.valueOf(value);
}
}
// -------------- SHORT
/**
* Returns the Short value for the given key.
* The method fails if the key does not exist.
*/
public short getShort(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Short.valueOf(value);
}
/**
* Returns the Short value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Short.
*/
public short getShort(String key, short defaultValue) {
addToDefaults(key, Short.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Short.valueOf(value);
}
}
// -------------- BYTE
/**
* Returns the Byte value for the given key.
* The method fails if the key does not exist.
*/
public byte getByte(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Byte.valueOf(value);
}
/**
* Returns the Byte value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Byte.
*/
public byte getByte(String key, byte defaultValue) {
addToDefaults(key, Byte.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Byte.valueOf(value);
}
}
// --------------- Internals
protected void addToDefaults(String key, String value) {
String currentValue = defaultData.get(key);
if (currentValue == null) {
if (value == null) {
value = DEFAULT_UNDEFINED;
}
defaultData.put(key, value);
} else {
// there is already an entry for this key. Check if the value is the undefined
if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
// update key with better default value
defaultData.put(key, value);
}
}
}
// ------------------------- Export to different targets -------------------------
/**
* Returns a {@link Configuration} object from this {@link ParameterTool}.
*
* @return A {@link Configuration}
*/
public Configuration getConfiguration() {
Configuration conf = new Configuration();
for (Map.Entry<String, String> entry : data.entrySet()) {
conf.setString(entry.getKey(), entry.getValue());
}
return conf;
}
/**
* Returns a {@link Properties} object from this {@link ParameterTool}.
*
* @return A {@link Properties}
*/
public Properties getProperties() {
Properties props = new Properties();
props.putAll(this.data);
return props;
}
/**
* Create a properties file with all the known parameters (call after the last get*() call).
* Set the default value, if available.
*
* <p>Use this method to create a properties file skeleton.
*
* @param pathToFile Location of the default properties file.
*/
public void createPropertiesFile(String pathToFile) throws IOException {
createPropertiesFile(pathToFile, true);
}
/**
* Create a properties file with all the known parameters (call after the last get*() call).
* Set the default value, if overwrite is true.
*
* @param pathToFile Location of the default properties file.
* @param overwrite Boolean flag indicating whether or not to overwrite the file
* @throws IOException If overwrite is not allowed and the file exists
*/
public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
File file = new File(pathToFile);
if (file.exists()) {
if (overwrite) {
file.delete();
} else {
throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed");
}
}
Properties defaultProps = new Properties();
defaultProps.putAll(this.defaultData);
try (final OutputStream out = new FileOutputStream(file)) {
defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()");
}
}
@Override
protected Object clone() throws CloneNotSupportedException {
return new ParameterTool(this.data);
}
// ------------------------- Interaction with other ParameterUtils -------------------------
/**
* Merges two {@link ParameterTool}.
*
* @param other Other {@link ParameterTool} object
* @return The Merged {@link ParameterTool}
*/
public ParameterTool mergeWith(ParameterTool other) {
Map<String, String> resultData = new HashMap<>(data.size() + other.data.size());
resultData.putAll(data);
resultData.putAll(other.data);
ParameterTool ret = new ParameterTool(resultData);
final HashSet<String> requestedParametersLeft = new HashSet<>(data.keySet());
requestedParametersLeft.removeAll(unrequestedParameters);
final HashSet<String> requestedParametersRight = new HashSet<>(other.data.keySet());
requestedParametersRight.removeAll(other.unrequestedParameters);
ret.unrequestedParameters.removeAll(requestedParametersLeft);
ret.unrequestedParameters.removeAll(requestedParametersRight);
return ret;
}
// ------------------------- ExecutionConfig.UserConfig interface -------------------------
@Override
public Map<String, String> toMap() {
return data;
}
// ------------------------- Serialization ---------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
defaultData = new ConcurrentHashMap<>(data.size());
unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
}
}