blob: 15579c6da8899884f654d3cb9c388ade0c99d7bd [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Set, Dict
from py4j.java_gateway import JavaObject
from pyflink.java_gateway import get_gateway
from pyflink.util.utils import add_jars_to_context_class_loader
class Configuration:
"""
Lightweight configuration object which stores key/value pairs.
"""
def __init__(self, other: 'Configuration' = None, j_configuration: JavaObject = None):
"""
Creates a new configuration.
:param other: Optional, if this parameter exists, creates a new configuration with a
copy of the given configuration.
:param j_configuration: Optional, the py4j java configuration object, if this parameter
exists, creates a wrapper for it.
"""
if j_configuration is not None:
self._j_configuration = j_configuration
else:
gateway = get_gateway()
JConfiguration = gateway.jvm.org.apache.flink.configuration.Configuration
if other is not None:
self._j_configuration = JConfiguration(other._j_configuration)
else:
self._j_configuration = JConfiguration()
def get_string(self, key: str, default_value: str) -> str:
"""
Returns the value associated with the given key as a string.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
return self._j_configuration.getString(key, default_value)
def set_string(self, key: str, value: str) -> 'Configuration':
"""
Adds the given key/value pair to the configuration object.
:param key: The key of the key/value pair to be added.
:param value: The value of the key/value pair to be added.
"""
jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
add_jars_to_context_class_loader(value.split(";"))
self._j_configuration.setString(key, value)
return self
def get_integer(self, key: str, default_value: int) -> int:
"""
Returns the value associated with the given key as an integer.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
return self._j_configuration.getLong(key, default_value)
def set_integer(self, key: str, value: int) -> 'Configuration':
"""
Adds the given key/value pair to the configuration object.
:param key: The key of the key/value pair to be added.
:param value: The value of the key/value pair to be added.
"""
self._j_configuration.setLong(key, value)
return self
def get_boolean(self, key: str, default_value: bool) -> bool:
"""
Returns the value associated with the given key as a boolean.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
return self._j_configuration.getBoolean(key, default_value)
def set_boolean(self, key: str, value: bool) -> 'Configuration':
"""
Adds the given key/value pair to the configuration object.
:param key: The key of the key/value pair to be added.
:param value: The value of the key/value pair to be added.
"""
self._j_configuration.setBoolean(key, value)
return self
def get_float(self, key: str, default_value: float) -> float:
"""
Returns the value associated with the given key as a float.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
return self._j_configuration.getDouble(key, float(default_value))
def set_float(self, key: str, value: float) -> 'Configuration':
"""
Adds the given key/value pair to the configuration object.
:param key: The key of the key/value pair to be added.
:param value: The value of the key/value pair to be added.
"""
self._j_configuration.setDouble(key, float(value))
return self
def get_bytearray(self, key: str, default_value: bytearray) -> bytearray:
"""
Returns the value associated with the given key as a byte array.
:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case there is no value
associated with the given key.
:return: The (default) value associated with the given key.
"""
return bytearray(self._j_configuration.getBytes(key, default_value))
def set_bytearray(self, key: str, value: bytearray) -> 'Configuration':
"""
Adds the given byte array to the configuration object.
:param key: The key under which the bytes are added.
:param value: The byte array to be added.
"""
self._j_configuration.setBytes(key, value)
return self
def key_set(self) -> Set[str]:
"""
Returns the keys of all key/value pairs stored inside this configuration object.
:return: The keys of all key/value pairs stored inside this configuration object.
"""
return set(self._j_configuration.keySet())
def add_all_to_dict(self, target_dict: Dict):
"""
Adds all entries in this configuration to the given dict.
:param target_dict: The dict to be updated.
"""
properties = get_gateway().jvm.java.util.Properties()
self._j_configuration.addAllToProperties(properties)
target_dict.update(properties)
def add_all(self, other: 'Configuration', prefix: str = None) -> 'Configuration':
"""
Adds all entries from the given configuration into this configuration. The keys are
prepended with the given prefix if exist.
:param other: The configuration whose entries are added to this configuration.
:param prefix: Optional, the prefix to prepend.
"""
if prefix is None:
self._j_configuration.addAll(other._j_configuration)
else:
self._j_configuration.addAll(other._j_configuration, prefix)
return self
def contains_key(self, key: str) -> bool:
"""
Checks whether there is an entry with the specified key.
:param key: Key of entry.
:return: True if the key is stored, false otherwise.
"""
return self._j_configuration.containsKey(key)
def to_dict(self) -> Dict[str, str]:
"""
Converts the configuration into a dict representation of string key-pair.
:return: Dict representation of the configuration.
"""
return dict(self._j_configuration.toMap())
def remove_config(self, key: str) -> bool:
"""
Removes given config key from the configuration.
:param key: The config key to remove.
:return: True if config has been removed, false otherwise.
"""
gateway = get_gateway()
JConfigOptions = gateway.jvm.org.apache.flink.configuration.ConfigOptions
config_option = JConfigOptions.key(key).noDefaultValue()
return self._j_configuration.removeConfig(config_option)
def __deepcopy__(self, memodict=None):
return Configuration(j_configuration=self._j_configuration.clone())
def __hash__(self):
return self._j_configuration.hashCode()
def __eq__(self, other):
if isinstance(other, Configuration):
return self._j_configuration.equals(other._j_configuration)
else:
return False
def __str__(self):
return self._j_configuration.toString()