[FLINK-3904] enhancements to GlobalConfiguration
- fail if config couldn't be loaded
- remove duplicate api methods
- remove undocumented XML loading feature
- generate yaml conf in tests instead of xml conf
- only load one config file instead of all xml or yaml files (flink-conf.yaml)
- make globalconfiguration non-global and remove static SINGLETON
- fix test cases
- add test cases
This closes #2123
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a888841..7c2ee2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -153,9 +153,7 @@
// load the configuration
LOG.info("Trying to load configuration file");
- GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
- System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath());
- this.config = GlobalConfiguration.getConfiguration();
+ this.config = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
try {
FileSystem.setDefaultScheme(config);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 41d8622..8320e04 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -48,11 +48,6 @@
CliFrontendTestUtils.pipeSystemOutToNull();
}
- @Before
- public void clearConfig() {
- CliFrontendTestUtils.clearGlobalConfiguration();
- }
-
@Test
public void testValidConfig() {
try {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 736d859..524e7e7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -55,7 +55,6 @@
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
- CliFrontendTestUtils.clearGlobalConfiguration();
}
@Test
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f710d8e..0326eab 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -36,7 +36,6 @@
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
- CliFrontendTestUtils.clearGlobalConfiguration();
}
@Test
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 7c34c75..9522ac7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -35,7 +35,6 @@
import java.util.UUID;
import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
import static org.junit.Assert.*;
public class CliFrontendStopTest extends TestLogger {
@@ -45,7 +44,6 @@
@BeforeClass
public static void setup() {
pipeSystemOutToNull();
- clearGlobalConfiguration();
actorSystem = ActorSystem.create("TestingActorSystem");
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 1872133..c411a7b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -71,33 +71,7 @@
System.setOut(new PrintStream(new BlackholeOutputSteam()));
System.setErr(new PrintStream(new BlackholeOutputSteam()));
}
-
- public static void clearGlobalConfiguration() {
- try {
- Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON");
- Field conf = GlobalConfiguration.class.getDeclaredField("config");
- Field map = Configuration.class.getDeclaredField("confData");
-
- singletonInstanceField.setAccessible(true);
- conf.setAccessible(true);
- map.setAccessible(true);
-
- GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null);
- if (gconf != null) {
- Configuration confObject = (Configuration) conf.get(gconf);
- @SuppressWarnings("unchecked")
- Map<String, Object> confData = (Map<String, Object>) map.get(confObject);
- confData.clear();
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test initialization caused an exception: " + e.getMessage());
- }
-
- }
-
+
private static final class BlackholeOutputSteam extends java.io.OutputStream {
@Override
public void write(int b){}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 6ad250d..9628bb7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -242,7 +242,7 @@
}
}
- final Configuration configuration = GlobalConfiguration.getConfiguration();
+ final Configuration configuration = GlobalConfiguration.loadConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
@@ -271,7 +271,7 @@
* @return Flink's internally used {@link JobID}.
*/
JobID getTopologyJobId(final String id) {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
+ final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
}
@@ -311,7 +311,7 @@
}
private FiniteDuration getTimeout() {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
+ final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
}
@@ -320,7 +320,7 @@
}
private ActorRef getJobManager() throws IOException {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
+ final Configuration configuration = GlobalConfiguration.loadConfiguration();
ActorSystem actorSystem;
try {
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 13a39ef..f8932b1 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -87,7 +87,7 @@
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
- final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+ final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
stormConf.put(Config.NIMBUS_HOST,
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index a89e73e..059198c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -45,7 +45,7 @@
private transient DataOutputViewStreamWrapper outView;
-
+
@Override
public void close() throws IOException {
try {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 99aa022..fd02c82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,13 +20,13 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
@@ -84,12 +84,18 @@
*/
private static int MAX_SAMPLE_LEN;
- static { loadGlobalConfigParams(); }
-
+ /**
+ * @Deprecated Please use {@code loadConfigParameters(Configuration config}
+ */
+ @Deprecated
protected static void loadGlobalConfigParams() {
- int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
+ loadConfigParameters(GlobalConfiguration.loadConfiguration());
+ }
+
+ protected static void loadConfigParameters(Configuration parameters) {
+ int maxSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
- int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
+ int minSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MIN_LINE_SAMPLES);
if (maxSamples < 0) {
@@ -113,7 +119,7 @@
DEFAULT_MIN_NUM_SAMPLES = minSamples;
}
- int maxLen = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
+ int maxLen = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
if (maxLen <= 0) {
maxLen = ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN;
@@ -164,13 +170,17 @@
// --------------------------------------------------------------------------------------------
// Constructors & Getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
-
+
public DelimitedInputFormat() {
- super();
+ this(null, null);
}
-
- protected DelimitedInputFormat(Path filePath) {
+
+ protected DelimitedInputFormat(Path filePath, Configuration configuration) {
super(filePath);
+ if (configuration == null) {
+ configuration = GlobalConfiguration.loadConfiguration();
+ }
+ loadConfigParameters(configuration);
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 95a1ffa..72d6061 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -87,15 +87,19 @@
* The splitLength is set to -1L for reading the whole split.
*/
protected static final long READ_WHOLE_SPLIT_FLAG = -1L;
-
+
static {
- initDefaultsFromConfiguration();
+ initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
initDefaultInflaterInputStreamFactories();
}
-
- private static void initDefaultsFromConfiguration() {
-
- final long to = GlobalConfiguration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
+
+ /**
+ * Initialize defaults for input format. Needs to be a static method because it is configured for local
+ * cluster execution, see LocalFlinkMiniCluster.
+ * @param configuration The configuration to load defaults from
+ */
+ private static void initDefaultsFromConfiguration(Configuration configuration) {
+ final long to = configuration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
if (to < 0) {
LOG.error("Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " +
@@ -154,10 +158,6 @@
}
}
- static long getDefaultOpeningTimeout() {
- return DEFAULT_OPENING_TIMEOUT;
- }
-
// --------------------------------------------------------------------------------------------
// Variables for internal operation.
// They are all transient, because we do not want them so be serialized
@@ -224,11 +224,8 @@
// --------------------------------------------------------------------------------------------
public FileInputFormat() {}
-
+
protected FileInputFormat(Path filePath) {
- if (filePath == null) {
- throw new IllegalArgumentException("The file path must not be null.");
- }
this.filePath = filePath;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..7530ba1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -22,11 +22,11 @@
import java.io.IOException;
import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -62,26 +62,30 @@
private static WriteMode DEFAULT_WRITE_MODE;
private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
-
-
- private static final void initDefaultsFromConfiguration(Configuration configuration) {
- final boolean overwrite = configuration.getBoolean(ConfigConstants
- .FILESYSTEM_DEFAULT_OVERWRITE_KEY,
+
+ static {
+ initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
+ }
+
+ /**
+ * Initialize defaults for output format. Needs to be a static method because it is configured for local
+ * cluster execution, see LocalFlinkMiniCluster.
+ * @param configuration The configuration to load defaults from
+ */
+ private static void initDefaultsFromConfiguration(Configuration configuration) {
+ final boolean overwrite = configuration.getBoolean(
+ ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
- final boolean alwaysCreateDirectory = configuration.getBoolean(ConfigConstants
- .FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
+ final boolean alwaysCreateDirectory = configuration.getBoolean(
+ ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
}
-
- static {
- initDefaultsFromConfiguration(GlobalConfiguration.getConfiguration());
- }
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -121,9 +125,9 @@
private transient boolean fileCreated;
// --------------------------------------------------------------------------------------------
-
+
public FileOutputFormat() {}
-
+
public FileOutputFormat(Path outputPath) {
this.outputFilePath = outputPath;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index e2c54ad..85d9cd8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -98,15 +98,15 @@
// --------------------------------------------------------------------------------------------
// Constructors and getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
-
+
protected GenericCsvInputFormat() {
super();
}
-
+
protected GenericCsvInputFormat(Path filePath) {
- super(filePath);
+ super(filePath, null);
}
-
+
// --------------------------------------------------------------------------------------------
public int getNumberOfFieldsTotal() {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
index 0e978b9..300c237 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
@@ -70,7 +70,7 @@
* <p>
* This method is always called first on a newly instantiated input format.
*
- * @param parameters The configuration with all parameters.
+ * @param parameters The configuration with all parameters (note: not the Flink config but the TaskConfig).
*/
void configure(Configuration parameters);
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
index 0ef160e..edbe1a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
@@ -33,7 +33,7 @@
public class SerializedOutputFormat<T extends IOReadableWritable> extends BinaryOutputFormat<T> {
private static final long serialVersionUID = 1L;
-
+
@Override
protected void serialize(T record, DataOutputView dataOutputView) throws IOException {
record.write(dataOutputView);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 7e50486..8d550d7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -21,21 +21,12 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
import org.apache.flink.annotation.Internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
/**
* Global configuration object for Flink. Similar to Java properties configuration
@@ -44,159 +35,62 @@
@Internal
public final class GlobalConfiguration {
- /** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
- /** The global configuration object accessible through a singleton pattern. */
- private static GlobalConfiguration SINGLETON = null;
-
- /** The internal map holding the key-value pairs the configuration consists of. */
- private final Configuration config = new Configuration();
+ public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
// --------------------------------------------------------------------------------------------
-
- /**
- * Retrieves the singleton object of the global configuration.
- *
- * @return the global configuration object
- */
- private static GlobalConfiguration get() {
- // lazy initialization currently only for testibility
- synchronized (GlobalConfiguration.class) {
- if (SINGLETON == null) {
- SINGLETON = new GlobalConfiguration();
- }
- return SINGLETON;
- }
- }
- /**
- * The constructor used to construct the singleton instance of the global configuration.
- */
private GlobalConfiguration() {}
// --------------------------------------------------------------------------------------------
-
- /**
- * Returns the value associated with the given key as a string.
- *
- * @param key
- * the key pointing to the associated value
- * @param defaultValue
- * the default value which is returned in case there is no value associated with the given key
- * @return the (default) value associated with the given key
- */
- public static String getString(String key, String defaultValue) {
- return get().config.getString(key, defaultValue);
- }
/**
- * Returns the value associated with the given key as a long integer.
- *
- * @param key
- * the key pointing to the associated value
- * @param defaultValue
- * the default value which is returned in case there is no value associated with the given key
- * @return the (default) value associated with the given key
+ * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
+ * empty configuration object if the environment variable is not set. In production this variable is set but
+ * tests and local execution/debugging don't have this environment variable set. That's why we should fail
+ * if it is not set.
+ * @return Returns the Configuration
*/
- public static long getLong(String key, long defaultValue) {
- return get().config.getLong(key, defaultValue);
- }
-
- /**
- * Returns the value associated with the given key as an integer.
- *
- * @param key
- * the key pointing to the associated value
- * @param defaultValue
- * the default value which is returned in case there is no value associated with the given key
- * @return the (default) value associated with the given key
- */
- public static int getInteger(String key, int defaultValue) {
- return get().config.getInteger(key, defaultValue);
- }
-
- /**
- * Returns the value associated with the given key as a float.
- *
- * @param key
- * the key pointing to the associated value
- * @param defaultValue
- * the default value which is returned in case there is no value associated with the given key
- * @return the (default) value associated with the given key
- */
- public static float getFloat(String key, float defaultValue) {
- return get().config.getFloat(key, defaultValue);
- }
-
- /**
- * Returns the value associated with the given key as a boolean.
- *
- * @param key
- * the key pointing to the associated value
- * @param defaultValue
- * the default value which is returned in case there is no value associated with the given key
- * @return the (default) value associated with the given key
- */
- public static boolean getBoolean(String key, boolean defaultValue) {
- return get().config.getBoolean(key, defaultValue);
+ public static Configuration loadConfiguration() {
+ final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ if (configDir == null) {
+ return new Configuration();
+ }
+ return loadConfiguration(configDir);
}
/**
* Loads the configuration files from the specified directory.
* <p>
- * XML and YAML are supported as configuration files. If both XML and YAML files exist in the configuration
- * directory, keys from YAML will overwrite keys from XML.
+ * YAML files are supported as configuration files.
*
* @param configDir
* the directory which contains the configuration files
*/
- public static void loadConfiguration(final String configDir) {
+ public static Configuration loadConfiguration(final String configDir) {
if (configDir == null) {
- LOG.warn("Given configuration directory is null, cannot load configuration");
- return;
+ throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
}
final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
- LOG.warn("The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath()
- + ") does not describe an existing directory.");
- return;
- }
-
- if (confDirFile.isFile()) {
- final File file = new File(configDir);
- if(configDir.endsWith(".xml")) {
- get().loadXMLResource( file );
- } else if(configDir.endsWith(".yaml")) {
- get().loadYAMLResource(file);
- } else {
- LOG.warn("The given configuration has an unknown extension.");
- return;
- }
- return;
+ throw new IllegalConfigurationException(
+ "The given configuration directory name '" + configDir +
+ "' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");
}
- // get all XML and YAML files in the directory
- final File[] xmlFiles = filterFilesBySuffix(confDirFile, ".xml");
- final File[] yamlFiles = filterFilesBySuffix(confDirFile, new String[] { ".yaml", ".yml" });
+ // get Flink yaml configuration file
+ final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
- if ((xmlFiles == null || xmlFiles.length == 0) && (yamlFiles == null || yamlFiles.length == 0)) {
- LOG.warn("Unable to get the contents of the config directory '" + configDir + "' ("
- + confDirFile.getAbsolutePath() + ").");
- return;
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalConfigurationException(
+ "The Flink config file '" + yamlConfigFile +
+ "' (" + confDirFile.getAbsolutePath() + ") does not exist.");
}
- // load config files and write into config map
- for (File f : xmlFiles) {
- get().loadXMLResource(f);
- }
-
- // => if both XML and YAML files exist, the YAML config keys overwrite XML settings
- for (File f : yamlFiles) {
- get().loadYAMLResource(f);
- }
+ return loadYAMLResource(yamlConfigFile);
}
/**
@@ -219,237 +113,46 @@
* @param file the YAML file to read from
* @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
*/
- private void loadYAMLResource(File file) {
+ private static Configuration loadYAMLResource(File file) {
+ final Configuration config = new Configuration();
- synchronized (getClass()) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))){
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
-
- String line = null;
- while ((line = reader.readLine()) != null) {
-
- // 1. check for comments
- String[] comments = line.split("#", 2);
- String conf = comments[0];
-
- // 2. get key and value
- if (conf.length() > 0) {
- String[] kv = conf.split(": ", 2);
-
- // skip line with no valid key-value pair
- if (kv.length == 1) {
- LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
- continue;
- }
-
- String key = kv[0].trim();
- String value = kv[1].trim();
-
- // sanity check
- if (key.length() == 0 || value.length() == 0) {
- LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
- continue;
- }
-
- LOG.debug("Loading configuration property: {}, {}", key, value);
-
- this.config.setString(key, value);
- }
- }
- }
- catch (IOException e) {
- LOG.error("Error parsing YAML configuration.", e);
- }
- finally {
- try {
- if(reader != null) {
- reader.close();
- }
- } catch (IOException e) {
- LOG.warn("Cannot to close reader with IOException.", e);
- }
- }
- }
- }
+ String line;
+ while ((line = reader.readLine()) != null) {
- /**
- * Loads an XML document of key-values pairs.
- *
- * @param file
- * the XML document file
- */
- private void loadXMLResource(File file) {
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0];
- final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
- // Ignore comments in the XML file
- docBuilderFactory.setIgnoringComments(true);
- docBuilderFactory.setNamespaceAware(true);
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
- try {
-
- final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc;
- Element root;
-
- doc = builder.parse(file);
-
- if (doc == null) {
- LOG.warn("Cannot load configuration: doc is null");
- return;
- }
-
- root = doc.getDocumentElement();
- if (root == null) {
- LOG.warn("Cannot load configuration: root is null");
- return;
- }
-
- if (!"configuration".equals(root.getNodeName())) {
- return;
- }
-
- final NodeList props = root.getChildNodes();
- int propNumber = -1;
-
- synchronized (getClass()) {
-
- for (int i = 0; i < props.getLength(); i++) {
-
- final Node propNode = props.item(i);
- String key = null;
- String value = null;
-
- // Ignore text at this point
- if (propNode instanceof Text) {
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
continue;
}
- if (!(propNode instanceof Element)) {
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
continue;
}
- Element property = (Element) propNode;
- if (!"property".equals(property.getNodeName())) {
- continue;
- }
-
- propNumber++;
- final NodeList propChildren = property.getChildNodes();
- if (propChildren == null) {
- LOG.warn("Error while reading configuration: property has no children, skipping...");
- continue;
- }
-
- for (int j = 0; j < propChildren.getLength(); j++) {
-
- final Node propChild = propChildren.item(j);
- if (propChild instanceof Element) {
- if ("key".equals(propChild.getNodeName())) {
- if (propChild.getChildNodes() != null) {
- if (propChild.getChildNodes().getLength() == 1) {
- if (propChild.getChildNodes().item(0) instanceof Text) {
- final Text t = (Text) propChild.getChildNodes().item(0);
- key = t.getTextContent();
- }
- }
- }
- }
-
- if ("value".equals(propChild.getNodeName())) {
- if (propChild.getChildNodes() != null) {
- if (propChild.getChildNodes().getLength() == 1) {
- if (propChild.getChildNodes().item(0) instanceof Text) {
- final Text t = (Text) propChild.getChildNodes().item(0);
- value = t.getTextContent();
- }
- }
- }
- }
- }
- }
-
- if (key != null && value != null) {
- // Put key, value pair into the map
- LOG.debug("Loading configuration property: {}, {}", key, value);
- this.config.setString(key, value);
- } else {
- LOG.warn("Error while reading configuration: Cannot read property " + propNumber);
- }
+ LOG.debug("Loading configuration property: {}, {}", key, value);
+ config.setString(key, value);
}
}
-
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
}
- catch (Exception e) {
- LOG.error("Cannot load configuration.", e);
- }
+
+ return config;
}
- /**
- * Gets a {@link Configuration} object with the values of this GlobalConfiguration
- *
- * @return the {@link Configuration} object including the key/value pairs
- */
- public static Configuration getConfiguration() {
- Configuration copy = new Configuration();
- copy.addAll(get().config);
- return copy;
- }
-
- /**
- * Merges the given {@link Configuration} object into the global
- * configuration. If a key/value pair with an identical already
- * exists in the global configuration, it is overwritten by the
- * pair of the {@link Configuration} object.
- *
- * @param conf
- * the {@link Configuration} object to merge into the global configuration
- */
- public static void includeConfiguration(Configuration conf) {
- get().includeConfigurationInternal(conf);
- }
-
- /**
- * Internal non-static method to include configuration.
- *
- * @param conf
- * the {@link Configuration} object to merge into the global configuration
- */
- private void includeConfigurationInternal(Configuration conf) {
- // static synchronized
- synchronized (getClass()) {
- this.config.addAll(conf);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Filters files in directory which have the specified suffix (e.g. ".xml").
- *
- * @param dirToFilter
- * directory to filter
- * @param suffix
- * suffix to filter files by (e.g. ".xml")
- * @return files with given ending in directory
- */
- private static File[] filterFilesBySuffix(final File dirToFilter, final String suffix) {
- return filterFilesBySuffix(dirToFilter, new String[] { suffix });
- }
-
- private static File[] filterFilesBySuffix(final File dirToFilter, final String[] suffixes) {
- return dirToFilter.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(final File dir, final String name) {
- for (String suffix : suffixes) {
- if (dir.equals(dirToFilter) && name != null && name.endsWith(suffix)) {
- return true;
- }
- }
-
- return false;
- }
- });
- }
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index 90b366c..a7374e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -41,7 +41,8 @@
return record;
}
}
-
+
+
@Test
public void testCreateInputSplitsWithOneFile() throws IOException {
// create temporary file with 3 blocks
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index fac979e..be73798 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -30,6 +30,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
+
public class DelimitedInputFormatSamplingTest {
private static final String TEST_DATA1 =
@@ -66,6 +67,8 @@
private static final int DEFAULT_NUM_SAMPLES = 4;
+ private static Configuration CONFIG;
+
// ========================================================================
// Setup
// ========================================================================
@@ -80,16 +83,17 @@
try {
// make sure we do 4 samples
- TestConfigUtils.loadGlobalConf(
+ CONFIG = TestConfigUtils.loadGlobalConf(
new String[] { ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY },
new String[] { "4", "4" });
-
- TestDelimitedInputFormat.prepare();
+
+
} catch (Throwable t) {
Assert.fail("Could not load the global configuration.");
}
}
+
// ========================================================================
// Tests
@@ -101,7 +105,7 @@
final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile.replace("file", "test"));
format.configure(conf);
@@ -109,7 +113,7 @@
format.getStatistics(null);
Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
- TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+ TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
format2.setFilePath(tempFile.replace("file", "test"));
format2.setNumLineSamples(8);
format2.configure(conf);
@@ -130,7 +134,7 @@
final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA1, TEST_DATA1, TEST_DATA1);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile.replace("file", "test"));
format.configure(conf);
@@ -138,7 +142,7 @@
format.getStatistics(null);
Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
- TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+ TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
format2.setFilePath(tempFile.replace("file", "test"));
format2.setNumLineSamples(8);
format2.configure(conf);
@@ -159,7 +163,7 @@
final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile);
format.configure(conf);
BaseStatistics stats = format.getStatistics(null);
@@ -180,7 +184,7 @@
final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA2);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile);
format.configure(conf);
BaseStatistics stats = format.getStatistics(null);
@@ -212,7 +216,7 @@
final String tempFile = TestFileUtils.createTempFile(testData);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile);
format.setDelimiter(DELIMITER);
format.configure(conf);
@@ -235,7 +239,7 @@
final String tempFile = TestFileUtils.createTempFile(2 * ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile);
format.configure(conf);
@@ -252,7 +256,7 @@
final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
final Configuration conf = new Configuration();
- final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath("test://" + tempFile);
format.configure(conf);
@@ -260,7 +264,7 @@
BaseStatistics stats = format.getStatistics(null);
Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
- final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+ final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
format2.setFilePath("test://" + tempFile);
format2.configure(conf);
@@ -274,21 +278,21 @@
Assert.fail(e.getMessage());
}
}
-
+
// ========================================================================
// Mocks
// ========================================================================
-
+
private static final class TestDelimitedInputFormat extends DelimitedInputFormat<IntValue> {
private static final long serialVersionUID = 1L;
-
+
+ TestDelimitedInputFormat(Configuration configuration) {
+ super(null, configuration);
+ }
+
@Override
public IntValue readRecord(IntValue reuse, byte[] bytes, int offset, int numBytes) {
throw new UnsupportedOperationException();
}
-
- public static void prepare() {
- DelimitedInputFormat.loadGlobalConfigParams();
- }
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 599a640..8a31099 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -38,6 +38,7 @@
import java.util.List;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -47,17 +48,18 @@
public class DelimitedInputFormatTest {
- private final DelimitedInputFormat<String> format = new MyTextInputFormat();
+ private DelimitedInputFormat<String> format;
// --------------------------------------------------------------------------------------------
@Before
public void setup() {
+ format = new MyTextInputFormat();
this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read"));
}
@After
- public void setdown() throws Exception {
+ public void shutdown() throws Exception {
if (this.format != null) {
this.format.close();
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 68465a3..1076338 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.testutils.TestFileUtils;
@@ -37,11 +38,12 @@
protected Configuration config;
final String tempPath = System.getProperty("java.io.tmpdir");
- private final DummyFileInputFormat format = new DummyFileInputFormat();
+ private DummyFileInputFormat format;
@Before
public void setup() {
this.config = new Configuration();
+ format = new DummyFileInputFormat();
}
@After
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index cc040b6..4a598f2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -22,13 +22,12 @@
import java.io.File;
import java.io.IOException;
-import org.junit.Assert;
-
import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.IntValue;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.fail;
@@ -38,11 +37,8 @@
@Test
public void testCreateNonParallelLocalFS() throws IOException {
- File tmpOutPath = null;
- File tmpOutFile = null;
-
- tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
- tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+ File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+ File tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
String tmpFilePath = tmpOutPath.toURI().toString();
@@ -652,8 +648,10 @@
// -------------------------------------------------------------------------------------------
public static class DummyFileOutputFormat extends FileOutputFormat<IntValue> {
+
private static final long serialVersionUID = 1L;
public boolean testFileName = false;
+
@Override
public void writeRecord(IntValue record) throws IOException {
// DO NOTHING
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index ae0f8e5..c3cbb58 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -52,4 +52,5 @@
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
}
+
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 296af11..4c303a6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -52,4 +52,5 @@
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
}
+
}
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
index 3e1a723..0cf5e32 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
@@ -21,7 +21,6 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.junit.Before;
import org.junit.Test;
import java.io.File;
@@ -38,15 +37,6 @@
public class FilesystemSchemeConfigTest {
- @Before
- public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
- IllegalAccessException {
- // reset GlobalConfiguration between tests
- Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
- instance.setAccessible(true);
- instance.set(null, null);
- }
-
@Test
public void testExplicitFilesystemScheme() {
testSettingFilesystemScheme(false, "fs.default-scheme: otherFS://localhost:1234/", true);
@@ -65,7 +55,12 @@
private void testSettingFilesystemScheme(boolean useDefaultScheme,
String configFileScheme, boolean useExplicitScheme) {
final File tmpDir = getTmpDir();
- final File confFile = createRandomFile(tmpDir, ".yaml");
+ final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
+ try {
+ confFile.createNewFile();
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't create file", e);
+ }
final File testFile = new File(tmpDir.getAbsolutePath() + File.separator + "testing.txt");
try {
@@ -83,8 +78,7 @@
fail(e.getMessage());
}
- GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
- Configuration conf = GlobalConfiguration.getConfiguration();
+ Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
try {
FileSystem.setDefaultScheme(conf);
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index ce55d2e..6336a73 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -19,114 +19,58 @@
package org.apache.flink.configuration;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.PrintWriter;
-import java.lang.reflect.Field;
import java.util.UUID;
-import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
-import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/**
* This class contains tests for the global configuration (parsing configuration directory information).
*/
public class GlobalConfigurationTest extends TestLogger {
- @Before
- public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
- IllegalAccessException {
- // reset GlobalConfiguration between tests
- Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
- instance.setAccessible(true);
- instance.set(null, null);
- }
-
- @Test
- public void testConfigurationMixed() {
- File tmpDir = getTmpDir();
- File confFile1 = createRandomFile(tmpDir, ".yaml");
- File confFile2 = createRandomFile(tmpDir, ".xml");
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
- try {
- try {
- PrintWriter pw1 = new PrintWriter(confFile1);
- PrintWriter pw2 = new PrintWriter(confFile2);
-
- pw1.println("mykey1: myvalue1_YAML");
- pw1.println("mykey2: myvalue2");
-
- pw2.println("<configuration>");
- pw2.println("<property><key>mykey1</key><value>myvalue1_XML</value></property>");
- pw2.println("<property><key>mykey3</key><value>myvalue3</value></property>");
- pw2.println("</configuration>");
-
- pw1.close();
- pw2.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- }
-
- GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
- Configuration conf = GlobalConfiguration.getConfiguration();
-
- // all distinct keys from confFile1 + confFile2key
- assertEquals(3, conf.keySet().size());
-
- // keys 1, 2, 3 should be OK and match the expected values
- // => configuration keys from YAML should overwrite keys from XML
- assertEquals("myvalue1_YAML", conf.getString("mykey1", null));
- assertEquals("myvalue2", conf.getString("mykey2", null));
- assertEquals("myvalue3", conf.getString("mykey3", null));
- } finally {
- confFile1.delete();
- confFile2.delete();
- tmpDir.delete();
- }
- }
-
@Test
public void testConfigurationYAML() {
- File tmpDir = getTmpDir();
- File confFile1 = createRandomFile(tmpDir, ".yaml");
- File confFile2 = createRandomFile(tmpDir, ".yml");
+ File tmpDir = tempFolder.getRoot();
+ File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
try {
- try {
- PrintWriter pw1 = new PrintWriter(confFile1);
- PrintWriter pw2 = new PrintWriter(confFile2);
+ try (final PrintWriter pw = new PrintWriter(confFile)) {
- pw1.println("###########################"); // should be skipped
- pw1.println("# Some : comments : to skip"); // should be skipped
- pw1.println("###########################"); // should be skipped
- pw1.println("mykey1: myvalue1"); // OK, simple correct case
- pw1.println("mykey2 : myvalue2"); // OK, whitespace before colon is correct
- pw1.println("mykey3:myvalue3"); // SKIP, missing white space after colon
- pw1.println(" some nonsense without colon and whitespace separator"); // SKIP
- pw1.println(" : "); // SKIP
- pw1.println(" "); // SKIP
- pw1.println("mykey4: myvalue4# some comments"); // OK, skip comments only
- pw1.println(" mykey5 : myvalue5 "); // OK, trim unnecessary whitespace
- pw1.println("mykey6: my: value6"); // OK, only use first ': ' as separator
- pw1.println("mykey7: "); // SKIP, no value provided
- pw1.println(": myvalue8"); // SKIP, no key provided
+ pw.println("###########################"); // should be skipped
+ pw.println("# Some : comments : to skip"); // should be skipped
+ pw.println("###########################"); // should be skipped
+ pw.println("mykey1: myvalue1"); // OK, simple correct case
+ pw.println("mykey2 : myvalue2"); // OK, whitespace before colon is correct
+ pw.println("mykey3:myvalue3"); // SKIP, missing white space after colon
+ pw.println(" some nonsense without colon and whitespace separator"); // SKIP
+ pw.println(" : "); // SKIP
+ pw.println(" "); // SKIP
+ pw.println("mykey4: myvalue4# some comments"); // OK, skip comments only
+ pw.println(" mykey5 : myvalue5 "); // OK, trim unnecessary whitespace
+ pw.println("mykey6: my: value6"); // OK, only use first ': ' as separator
+ pw.println("mykey7: "); // SKIP, no value provided
+ pw.println(": myvalue8"); // SKIP, no key provided
- pw2.println("mykey9: myvalue9"); // OK
- pw2.println("mykey9: myvalue10"); // OK, overwrite last value
+ pw.println("mykey9: myvalue9"); // OK
+ pw.println("mykey9: myvalue10"); // OK, overwrite last value
- pw1.close();
- pw2.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
- GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
- Configuration conf = GlobalConfiguration.getConfiguration();
+ Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
// all distinct keys from confFile1 + confFile2 key
assertEquals(6, conf.keySet().size());
@@ -142,83 +86,36 @@
assertEquals("null", conf.getString("mykey8", "null"));
assertEquals("myvalue10", conf.getString("mykey9", null));
} finally {
- confFile1.delete();
- confFile2.delete();
+ confFile.delete();
tmpDir.delete();
}
}
- /**
- * This test creates several configuration files with values and cross-checks the resulting
- * {@link GlobalConfiguration} object.
- */
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailIfNull() {
+ GlobalConfiguration.loadConfiguration(null);
+ }
+
+ @Test(expected = IllegalConfigurationException.class)
+ public void testFailIfNotLoaded() {
+ GlobalConfiguration.loadConfiguration("/some/path/" + UUID.randomUUID());
+ }
+
+ @Test(expected = IllegalConfigurationException.class)
+ public void testInvalidConfiguration() throws IOException {
+ GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath());
+ }
+
@Test
- public void testConfigurationXML() {
+ // We allow malformed YAML files
+ public void testInvalidYamlFile() throws IOException {
+ final File confFile = tempFolder.newFile(GlobalConfiguration.FLINK_CONF_FILENAME);
- // Create temporary directory for configuration files
- final File tmpDir = getTmpDir();
- final File confFile1 = createRandomFile(tmpDir, ".xml");
- final File confFile2 = createRandomFile(tmpDir, ".xml");
-
- try {
- try {
- final PrintWriter pw1 = new PrintWriter(confFile1);
- final PrintWriter pw2 = new PrintWriter(confFile2);
-
- pw1.append("<configuration>");
- pw2.append("<configuration>");
-
- pw1.append("<property><key>mykey1</key><value>myvalue1</value></property>");
- pw1.append("<property></property>");
- pw1.append("<property><key></key><value></value></property>");
- pw1.append("<property><key>hello</key><value></value></property>");
- pw1.append("<property><key>mykey2</key><value>myvalue2</value></property>");
- pw2.append("<property><key>mykey3</key><value>myvalue3</value></property>");
- pw2.append("<property><key>mykey4</key><value>myvalue4</value></property>");
-
- pw1.append("</configuration>");
- pw2.append("</configuration>");
- pw1.close();
- pw2.close();
- } catch (FileNotFoundException e) {
- fail(e.getMessage());
- }
-
- GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-
- final Configuration co = GlobalConfiguration.getConfiguration();
-
- assertEquals(co.getString("mykey1", "null"), "myvalue1");
- assertEquals(co.getString("mykey2", "null"), "myvalue2");
- assertEquals(co.getString("mykey3", "null"), "myvalue3");
- assertEquals(co.getString("mykey4", "null"), "myvalue4");
-
- // // Test (wrong) string-to integer conversion. should return default value.
- // semantics are changed to throw an exception upon invalid parsing!
- // assertEquals(co.getInteger("mykey1", 500), 500);
- // assertEquals(co.getInteger("anything", 500), 500);
- // assertEquals(co.getBoolean("notexistent", true), true);
-
- // Test include local configuration
- final Configuration newconf = new Configuration();
- newconf.setInteger("mynewinteger", 1000);
- GlobalConfiguration.includeConfiguration(newconf);
- assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000);
- } finally {
- // Remove temporary files
- confFile1.delete();
- confFile2.delete();
- tmpDir.delete();
+ try (PrintWriter pw = new PrintWriter(confFile);) {
+ pw.append("invalid");
}
+
+ assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()));
}
- private File getTmpDir() {
- File tmpDir = new File(CommonTestUtils.getTempDir(), UUID.randomUUID().toString());
- assertTrue(tmpDir.mkdirs());
- return tmpDir;
- }
-
- private File createRandomFile(File path, String suffix) {
- return new File(path, UUID.randomUUID().toString() + suffix);
- }
}
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
index 6096f69..d34f20a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
@@ -23,6 +23,7 @@
import java.io.FileWriter;
import java.io.IOException;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
/**
@@ -30,20 +31,20 @@
*/
public final class TestConfigUtils {
- public static void loadGlobalConf(String[] keys, String[] values) throws IOException {
- loadGlobalConf(getConfAsString(keys, values));
+ public static Configuration loadGlobalConf(String[] keys, String[] values) throws IOException {
+ return loadGlobalConf(getConfAsString(keys, values));
}
- public static void loadGlobalConf(String contents) throws IOException {
+ public static Configuration loadGlobalConf(String contents) throws IOException {
final File tempDir = new File(System.getProperty("java.io.tmpdir"));
- File confDir = null;
+ File confDir;
do {
confDir = new File(tempDir, TestFileUtils.randomFileName());
} while (confDir.exists());
try {
confDir.mkdirs();
- final File confFile = new File(confDir, "tempConfig.xml");
+ final File confFile = new File(confDir, GlobalConfiguration.FLINK_CONF_FILENAME);
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(confFile));
@@ -52,7 +53,7 @@
} finally {
writer.close();
}
- GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
+ return GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
} finally {
confFile.delete();
}
@@ -61,25 +62,25 @@
confDir.delete();
}
}
-
+
public static String getConfAsString(String[] keys, String[] values) {
if (keys == null || values == null || keys.length != values.length) {
throw new IllegalArgumentException();
}
-
+
StringBuilder bld = new StringBuilder();
- bld.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<configuration>\n");
-
+
for (int i = 0; i < keys.length; i++) {
- bld.append("<property>\n<key>").append(keys[i]).append("</key>\n");
- bld.append("<value>").append(values[i]).append("</value>\n</property>\n");
+ bld.append(keys[i]);
+ bld.append(": ");
+ bld.append(values[i]);
+ bld.append(System.lineSeparator());
}
- bld.append("</configuration>\n");
return bld.toString();
}
// ------------------------------------------------------------------------
-
+
private TestConfigUtils() {}
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index ab4e993..7c41eaf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -104,13 +104,17 @@
* This method is public because its being used in the HadoopDataSource.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+
+ org.apache.flink.configuration.Configuration flinkConfiguration =
+ GlobalConfiguration.loadConfiguration();
+
Configuration retConf = new org.apache.hadoop.conf.Configuration();
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration
// Try to load HDFS configuration from Hadoop's own configuration files
// 1. approach: Flink configuration
- final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants
+ final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants
.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
@@ -118,7 +122,7 @@
LOG.debug("Cannot find hdfs-default configuration file");
}
- final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+ final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
} else {
@@ -127,7 +131,7 @@
// 2. Approach environment variables
String[] possibleHadoopConfPaths = new String[4];
- possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+ possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
if (System.getenv("HADOOP_HOME") != null) {
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index b219de4..52fd734 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -37,12 +37,14 @@
/**
* Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
*/
- public static void mergeHadoopConf(Configuration configuration) {
- Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
-
+ public static void mergeHadoopConf(Configuration hadoopConfig) {
+
+ Configuration hadoopConf =
+ org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+
for (Map.Entry<String, String> e : hadoopConf) {
- if (configuration.get(e.getKey()) == null) {
- configuration.set(e.getKey(), e.getValue());
+ if (hadoopConfig.get(e.getKey()) == null) {
+ hadoopConfig.set(e.getKey(), e.getValue());
}
}
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 75b82cd..05ed6fa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -46,12 +46,12 @@
public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
- super(filePath);
+ super(filePath, null);
this.primitiveClass = primitiveClass;
}
public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> primitiveClass) {
- super(filePath);
+ super(filePath, null);
this.primitiveClass = primitiveClass;
this.setDelimiter(delimiter);
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index d6a02f1..b2554bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -50,7 +50,7 @@
// --------------------------------------------------------------------------------------------
public TextInputFormat(Path filePath) {
- super(filePath);
+ super(filePath, null);
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index a0d20d6..45a2e3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -50,7 +50,7 @@
// --------------------------------------------------------------------------------------------
public TextValueInputFormat(Path filePath) {
- super(filePath);
+ super(filePath, null);
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a6cbfa8..d55b9d4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -73,8 +73,10 @@
public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
- public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
- public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+ public static String FLINK_PYTHON2_BINARY_PATH =
+ GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, "python");
+ public static String FLINK_PYTHON3_BINARY_PATH =
+ GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, "python3");
private static final Random r = new Random();
@@ -113,8 +115,9 @@
}
public PythonPlanBinder() throws IOException {
- FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
- FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+ Configuration conf = GlobalConfiguration.loadConfiguration();
+ FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+ FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
FULL_PATH = FLINK_DIR != null
//command-line
? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 12c5dfc..5ab1fbf 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -30,6 +30,7 @@
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -49,7 +50,6 @@
import org.apache.flink.optimizer.plan.WorksetPlanNode;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -107,10 +107,11 @@
public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
- private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
-
+ private static final boolean mergeIterationAuxTasks =
+ GlobalConfiguration.loadConfiguration().getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
+
private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null);
-
+
// ------------------------------------------------------------------------
private Map<PlanNode, JobVertex> vertices; // a map from optimizer nodes to job vertices
@@ -156,7 +157,6 @@
this.useLargeRecordHandler = config.getBoolean(
ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
-
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 4e05ebe..5d7173b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -176,20 +176,24 @@
* This method is public because its being used in the HadoopDataSource.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+
+ org.apache.flink.configuration.Configuration flinkConfiguration =
+ GlobalConfiguration.loadConfiguration();
+
Configuration retConf = new org.apache.hadoop.conf.Configuration();
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration
// Try to load HDFS configuration from Hadoop's own configuration files
// 1. approach: Flink configuration
- final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+ final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
} else {
LOG.debug("Cannot find hdfs-default configuration file");
}
- final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+ final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
} else {
@@ -198,7 +202,7 @@
// 2. Approach environment variables
String[] possibleHadoopConfPaths = new String[4];
- possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+ possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
if (System.getenv("HADOOP_HOME") != null) {
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f14a37f..84d38c1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2350,8 +2350,7 @@
}
LOG.info("Loading configuration from " + configDir)
- GlobalConfiguration.loadConfiguration(configDir)
- val configuration = GlobalConfiguration.getConfiguration()
+ val configuration = GlobalConfiguration.loadConfiguration(configDir)
try {
FileSystem.setDefaultScheme(configuration)
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..226fa75 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1526,7 +1526,6 @@
val conf: Configuration = try {
LOG.info("Loading configuration from " + cliConfig.getConfigDir())
GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
- GlobalConfiguration.getConfiguration()
}
catch {
case e: Exception => throw new Exception("Could not load configuration", e)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index fa61acf..3e6702a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -46,10 +46,6 @@
@Test
public void testLibraryCacheManagerCleanup() {
- Configuration config = new Configuration();
-
- config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1);
- GlobalConfiguration.includeConfiguration(config);
JobID jid = new JobID();
List<BlobKey> keys = new ArrayList<BlobKey>();
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index f3b3507..2f5cc47 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -142,7 +142,7 @@
): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
- val config = GlobalConfiguration.getConfiguration()
+ val config = GlobalConfiguration.loadConfiguration()
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
val miniCluster = new LocalFlinkMiniCluster(config, false)
@@ -189,7 +189,7 @@
val conf = cluster match {
case Some(Left(miniCluster)) => miniCluster.userConfiguration
case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
- case None => GlobalConfiguration.getConfiguration
+ case None => GlobalConfiguration.loadConfiguration()
}
println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 5721a61..ee1b264 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -380,13 +380,13 @@
val repl = externalJars match {
case Some(ej) => new FlinkILoop(
host, port,
- GlobalConfiguration.getConfiguration,
+ GlobalConfiguration.loadConfiguration(),
Option(Array(ej)),
in, new PrintWriter(out))
case None => new FlinkILoop(
host, port,
- GlobalConfiguration.getConfiguration,
+ GlobalConfiguration.loadConfiguration(),
in, new PrintWriter(out))
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 0332684..5b20447 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -42,7 +42,7 @@
if (ctx.getParallelism() > 0) {
setParallelism(ctx.getParallelism());
} else {
- setParallelism(GlobalConfiguration.getInteger(
+ setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index f7cf160..b1521f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -40,7 +40,7 @@
setParallelism(parallelism);
} else {
// determine parallelism
- setParallelism(GlobalConfiguration.getInteger(
+ setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 8ba786f..60ae2ef 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -97,14 +97,6 @@
System.setErr(ERR);
}
- @Before
- public void clearConfig() throws NoSuchFieldException, IllegalAccessException {
- // reset GlobalConfiguration between tests
- Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
- instance.setAccessible(true);
- instance.set(null, null);
- }
-
private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
private static final ApplicationId TEST_YARN_APPLICATION_ID =
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 75445e1a..9b52975 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -110,7 +110,7 @@
String fsStateHandlePath = tmp.getRoot().getPath();
- flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
+ flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 8a2ad60..3caa0ee 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -225,7 +225,7 @@
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
flinkYarnClient.setConfigurationDirectory(confDirPath);
- flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
+ flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
// deploy
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 4df46a6..ba07af1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -81,8 +81,6 @@
public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
- private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
/**
* Minimum memory requirements, checked by the Client.
*/
@@ -142,10 +140,9 @@
// tries to load the config through the environment, if it fails it can still be set through the setters
try {
this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
- GlobalConfiguration.loadConfiguration(configurationDirectory);
- this.flinkConfiguration = GlobalConfiguration.getConfiguration();
+ this.flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);
- File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME);
+ File confFile = new File(configurationDirectory + File.separator + GlobalConfiguration.FLINK_CONF_FILENAME);
if (!confFile.exists()) {
throw new RuntimeException("Unable to locate configuration file in " + confFile);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index d19ddde..39b2510 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -418,8 +418,7 @@
private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
LOG.info("Loading config from directory " + baseDirectory);
- GlobalConfiguration.loadConfiguration(baseDirectory);
- Configuration configuration = GlobalConfiguration.getConfiguration();
+ Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);