blob: 58f2f0002335e55a31844ee0f42bc7f5dbc429ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.externalresource;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.resources.ExternalResource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.configuration.ConfigOptions.key;
/** Utility class for external resource framework. */
public class ExternalResourceUtils {
private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);
private ExternalResourceUtils() {
throw new UnsupportedOperationException("This class should never be instantiated.");
}
/** Get the enabled external resource list from configuration. */
private static Set<String> getExternalResourceSet(Configuration config) {
if (config.getValue(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)
.equals(ExternalResourceOptions.NONE)) {
return Collections.emptySet();
}
return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST));
}
/**
* Get the external resource configuration keys map, indexed by the resource name. The
* configuration key should be used for deployment specific container request.
*
* @param config Configurations
* @param suffix suffix of config option for deployment specific configuration key
* @return external resource configuration keys map, map the resource name to the configuration
* key for deployment * specific container request
*/
public static Map<String, String> getExternalResourceConfigurationKeys(
Configuration config, String suffix) {
final Set<String> resourceSet = getExternalResourceSet(config);
final Map<String, String> configKeysToResourceNameMap = new HashMap<>();
LOG.info("Enabled external resources: {}", resourceSet);
if (resourceSet.isEmpty()) {
return Collections.emptyMap();
}
final Map<String, String> externalResourceConfigs = new HashMap<>();
for (String resourceName : resourceSet) {
final ConfigOption<String> configKeyOption =
key(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
resourceName, suffix))
.stringType()
.noDefaultValue();
final String configKey = config.get(configKeyOption);
if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
LOG.warn(
"Could not find valid {} for {}. Will ignore that resource.",
configKeyOption.key(),
resourceName);
} else {
configKeysToResourceNameMap.compute(
configKey,
(ignored, previousResource) -> {
if (previousResource != null) {
LOG.warn(
"Duplicate config key {} occurred for external resources, the one named {} will overwrite the value.",
configKey,
resourceName);
externalResourceConfigs.remove(previousResource);
}
return resourceName;
});
externalResourceConfigs.put(resourceName, configKey);
}
}
return externalResourceConfigs;
}
/**
* Instantiate {@link StaticExternalResourceInfoProvider} for all of enabled external resources.
*/
public static ExternalResourceInfoProvider createStaticExternalResourceInfoProviderFromConfig(
Configuration configuration, PluginManager pluginManager) {
final Map<String, Long> externalResourceAmountMap =
getExternalResourceAmountMap(configuration);
LOG.info("Enabled external resources: {}", externalResourceAmountMap.keySet());
return createStaticExternalResourceInfoProvider(
externalResourceAmountMap,
externalResourceDriversFromConfig(configuration, pluginManager));
}
/** Get the map of resource name and amount of all of enabled external resources. */
@VisibleForTesting
static Map<String, Long> getExternalResourceAmountMap(Configuration config) {
final Set<String> resourceSet = getExternalResourceSet(config);
if (resourceSet.isEmpty()) {
return Collections.emptyMap();
}
final Map<String, Long> externalResourceAmountMap = new HashMap<>();
for (String resourceName : resourceSet) {
final ConfigOption<Long> amountOption =
key(ExternalResourceOptions.getAmountConfigOptionForResource(resourceName))
.longType()
.noDefaultValue();
final Optional<Long> amountOpt = config.getOptional(amountOption);
if (!amountOpt.isPresent()) {
LOG.warn(
"The amount of the {} should be configured. Will ignore that resource.",
resourceName);
} else if (amountOpt.get() <= 0) {
LOG.warn(
"The amount of the {} should be positive while finding {}. Will ignore that resource.",
amountOpt.get(),
resourceName);
} else {
externalResourceAmountMap.put(resourceName, amountOpt.get());
}
}
return externalResourceAmountMap;
}
/** Get the collection of all enabled external resources. */
public static Collection<ExternalResource> getExternalResourcesCollection(
Configuration config) {
return getExternalResourceAmountMap(config).entrySet().stream()
.map(entry -> new ExternalResource(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
/** Generate the string expression of the given external resources. */
public static String generateExternalResourcesString(
Collection<ExternalResource> extendedResources) {
return extendedResources.stream()
.map(resource -> resource.getName() + "=" + resource.getValue())
.collect(Collectors.joining(", "));
}
/**
* Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled
* external resources. {@link ExternalResourceDriver ExternalResourceDrivers} are mapped to its
* resource name.
*/
@VisibleForTesting
static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(
Configuration config, PluginManager pluginManager) {
final Set<String> resourceSet = getExternalResourceSet(config);
if (resourceSet.isEmpty()) {
return Collections.emptyMap();
}
final Iterator<ExternalResourceDriverFactory> factoryIterator =
pluginManager.load(ExternalResourceDriverFactory.class);
final Map<String, ExternalResourceDriverFactory> externalResourceFactories =
new HashMap<>();
factoryIterator.forEachRemaining(
externalResourceDriverFactory ->
externalResourceFactories.put(
externalResourceDriverFactory.getClass().getName(),
externalResourceDriverFactory));
final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
for (String resourceName : resourceSet) {
final ConfigOption<String> driverClassOption =
key(ExternalResourceOptions
.getExternalResourceDriverFactoryConfigOptionForResource(
resourceName))
.stringType()
.noDefaultValue();
final String driverFactoryClassName = config.getString(driverClassOption);
if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
LOG.warn(
"Could not find driver class name for {}. Please make sure {} is configured.",
resourceName,
driverClassOption.key());
continue;
}
ExternalResourceDriverFactory externalResourceDriverFactory =
externalResourceFactories.get(driverFactoryClassName);
if (externalResourceDriverFactory != null) {
DelegatingConfiguration delegatingConfiguration =
new DelegatingConfiguration(
config,
ExternalResourceOptions
.getExternalResourceParamConfigPrefixForResource(
resourceName));
try {
externalResourceDrivers.put(
resourceName,
externalResourceDriverFactory.createExternalResourceDriver(
delegatingConfiguration));
LOG.info("Add external resources driver for {}.", resourceName);
} catch (Exception e) {
LOG.warn(
"Could not instantiate driver with factory {} for {}. {}",
driverFactoryClassName,
resourceName,
e);
}
} else {
LOG.warn(
"Could not find factory class {} for {}.",
driverFactoryClassName,
resourceName);
}
}
return externalResourceDrivers;
}
/**
* Instantiate {@link StaticExternalResourceInfoProvider} for all of enabled external resources.
*/
@VisibleForTesting
static ExternalResourceInfoProvider createStaticExternalResourceInfoProvider(
Map<String, Long> externalResourceAmountMap,
Map<String, ExternalResourceDriver> externalResourceDrivers) {
final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>();
for (Map.Entry<String, ExternalResourceDriver> externalResourceDriverEntry :
externalResourceDrivers.entrySet()) {
final String resourceName = externalResourceDriverEntry.getKey();
final ExternalResourceDriver externalResourceDriver =
externalResourceDriverEntry.getValue();
if (externalResourceAmountMap.containsKey(resourceName)) {
try {
final Set<? extends ExternalResourceInfo> externalResourceInfos;
externalResourceInfos =
externalResourceDriver.retrieveResourceInfo(
externalResourceAmountMap.get(resourceName));
externalResources.put(resourceName, externalResourceInfos);
} catch (Exception e) {
LOG.warn(
"Failed to retrieve information of external resource {}.",
resourceName,
e);
}
} else {
LOG.warn("Could not found legal amount configuration for {}.", resourceName);
}
}
return new StaticExternalResourceInfoProvider(externalResources);
}
}