blob: 6361bd6e76933cf78e1c37c224993a2c71f295a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal;
import org.apache.twill.api.Configs;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillSpecification;
import java.net.URI;
import java.util.Map;
import javax.annotation.Nullable;
/**
* Represents runtime specification of a {@link TwillApplication}.
*/
public class TwillRuntimeSpecification {
private final TwillSpecification twillSpecification;
private final String fsUser;
private final URI twillAppDir;
private final String zkConnectStr;
private final RunId twillRunId;
private final String twillAppName;
private final String rmSchedulerAddr;
private final Map<String, Map<String, String>> logLevels;
private final Map<String, Integer> maxRetries;
private final Map<String, String> config;
private final Map<String, Map<String, String>> runnableConfigs;
public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
String zkConnectStr, RunId twillRunId, String twillAppName,
@Nullable String rmSchedulerAddr, Map<String, Map<String, String>> logLevels,
Map<String, Integer> maxRetries, Map<String, String> config,
Map<String, Map<String, String>> runnableConfigs) {
this.twillSpecification = twillSpecification;
this.fsUser = fsUser;
this.twillAppDir = twillAppDir;
this.zkConnectStr = zkConnectStr;
this.twillRunId = twillRunId;
this.twillAppName = twillAppName;
this.rmSchedulerAddr = rmSchedulerAddr;
this.logLevels = logLevels;
this.maxRetries = maxRetries;
this.config = config;
this.runnableConfigs = runnableConfigs;
}
public TwillSpecification getTwillSpecification() {
return twillSpecification;
}
public String getFsUser() {
return fsUser;
}
public URI getTwillAppDir() {
return twillAppDir;
}
public String getZkConnectStr() {
return zkConnectStr;
}
public RunId getTwillAppRunId() {
return twillRunId;
}
public String getTwillAppName() {
return twillAppName;
}
/**
* Returns the minimum heap ratio for the application master.
*/
public double getAMMinHeapRatio() {
return getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO);
}
/**
* Returns the minimum heap ratio for the given runnable.
*/
public double getMinHeapRatio(String runnableName) {
double ratio = getMinHeapRatio(runnableConfigs.get(runnableName), 0d);
return ratio <= 0d ? getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO) : ratio;
}
/**
* Returns the reserved non-heap memory size in MB for the application master.
*/
public int getAMReservedMemory() {
return getReservedMemory(config, Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
}
/**
* Returns the reserved non-heap memory size in MB for the given runnable.
*/
public int getReservedMemory(String runnableName) {
int memory = getReservedMemory(runnableConfigs.get(runnableName), Configs.Keys.JAVA_RESERVED_MEMORY_MB, -1);
return memory < 0 ? getReservedMemory(config, Configs.Keys.JAVA_RESERVED_MEMORY_MB,
Configs.Defaults.JAVA_RESERVED_MEMORY_MB) : memory;
}
/**
* Returns whether log collection is enabled.
*/
public boolean isLogCollectionEnabled() {
return config.containsKey(Configs.Keys.LOG_COLLECTION_ENABLED) ?
Boolean.parseBoolean(config.get(Configs.Keys.LOG_COLLECTION_ENABLED)) :
Configs.Defaults.LOG_COLLECTION_ENABLED;
}
@Nullable
public String getRmSchedulerAddr() {
return rmSchedulerAddr;
}
public Map<String, Map<String, String>> getLogLevels() {
return logLevels;
}
public Map<String, Integer> getMaxRetries() {
return maxRetries;
}
/**
* Returns the configuration for the application.
*/
public Map<String, String> getConfig() {
return config;
}
/**
* Returns the configurations for each runnable.
*/
public Map<String, Map<String, String>> getRunnableConfigs() {
return runnableConfigs;
}
/**
* Returns the ZK connection string for the Kafka used for log collections,
* or {@code null} if log collection is disabled.
*/
@Nullable
public String getKafkaZKConnect() {
if (!isLogCollectionEnabled()) {
return null;
}
// When addressing TWILL-147, a field can be introduced to carry this value.
return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
}
/**
* Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
*/
private double getMinHeapRatio(@Nullable Map<String, String> config, double defaultValue) {
if (config == null || !config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) {
return defaultValue;
}
try {
double ratio = Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO));
if (ratio <= 0d) {
throw new IllegalArgumentException("Minimum heap ratio configured with key '" +
Configs.Keys.HEAP_RESERVED_MIN_RATIO +
"' must be > 0. It is configured to " + ratio);
}
return ratio;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse the minimum heap ratio from configuration with key '" +
Configs.Keys.HEAP_RESERVED_MIN_RATIO + "'", e);
}
}
/**
* Returns the reserved memory size based on the given configuration.
*/
private int getReservedMemory(@Nullable Map<String, String> config, String key, int defaultValue) {
if (config == null || !config.containsKey(key)) {
return defaultValue;
}
try {
int memory = Integer.parseInt(config.get(key));
if (memory < 0) {
throw new IllegalArgumentException("Reserved memory size configured with key '" + key +
"' must be >= 0. It is configured to " + memory);
}
return memory;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse the reserved memory size from configuration with key '" +
key + "'", e);
}
}
}