blob: 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import org.apache.samza.SamzaException;
public class YarnConfig extends MapConfig {
/**
* (Required) URL from which the job package can be downloaded
*/
public static final String PACKAGE_PATH = "yarn.package.path";
// Configs related to each yarn container
/**
* Memory, in megabytes, to request from YARN per container
*/
public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
private static final int DEFAULT_CONTAINER_MEM = 1024;
/**
* Name of YARN queue to run jobs on
*/
public static final String QUEUE_NAME = "yarn.queue";
/**
* Number of CPU cores to request from YARN per container
*/
public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
private static final int DEFAULT_CPU_CORES = 1;
/**
* Maximum number of times the AM tries to restart a failed container
*/
public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count";
private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
/**
* Determines how frequently a container is allowed to fail before we give up and fail the job
*/
public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms";
private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
// Configs related to the Samza Application Master (AM)
/**
* (Optional) JVM options to include in the command line when executing the AM
*/
public static final String AM_JVM_OPTIONS = "yarn.am.opts";
/**
* Determines whether a JMX server should be started on the AM
* Default: true
*/
public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
/**
* Memory, in megabytes, to request from YARN for running the AM
*/
public static final String AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb";
private static final int DEFAULT_AM_CONTAINER_MAX_MEMORY_MB = 1024;
/**
* Determines the interval for the Heartbeat between the AM and the Yarn RM
*/
public static final String AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms";
private static final int DEFAULT_POLL_INTERVAL_MS = 1000;
/**
* (Optional) JAVA_HOME path for Samza AM
*/
public static final String AM_JAVA_HOME = "yarn.am.java.home";
// Configs related to the ContainerAllocator thread
/**
* Sleep interval for the allocator thread in milliseconds
*/
public static final String ALLOCATOR_SLEEP_MS = "yarn.allocator.sleep.ms";
private static final int DEFAULT_ALLOCATOR_SLEEP_MS = 3600;
/**
* Number of milliseconds before a container request is considered to have to expired
*/
public static final String CONTAINER_REQUEST_TIMEOUT_MS = "yarn.container.request.timeout.ms";
private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
/**
* Flag to indicate if host-affinity is enabled for the job or not
*/
public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
/**
* Principal used to log in on a Kerberized secure cluster
*/
public static final String YARN_KERBEROS_PRINCIPAL = "yarn.kerberos.principal";
/**
* Key tab used to log in on a Kerberized secure cluster
*/
public static final String YARN_KERBEROS_KEYTAB = "yarn.kerberos.keytab";
/**
* Interval in seconds to renew a delegation token in Kerberized secure cluster
*/
public static final String YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = "yarn.token.renewal.interval.seconds";
private static final long DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = 24 * 3600;
/**
* The location on HDFS to store the credentials file
*/
public static final String YARN_CREDENTIALS_FILE = "yarn.credentials.file";
/**
* The staging directory on HDFS for the job
*/
public static final String YARN_JOB_STAGING_DIRECTORY = "yarn.job.staging.directory";
public YarnConfig(Config config) {
super(config);
}
public int getContainerRetryCount() {
return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
}
public int getContainerRetryWindowMs() {
return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS);
}
public int getAMPollIntervalMs() {
return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
}
public int getContainerMaxMemoryMb() {
return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
}
public int getContainerMaxCpuCores() {
return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
}
public boolean getJmxServerEnabled() {
return getBoolean(AM_JMX_ENABLED, true);
}
public String getPackagePath() {
String packagePath = get(PACKAGE_PATH);
if (packagePath == null) {
throw new SamzaException("No YARN package path defined in config.");
}
return packagePath;
}
public int getAMContainerMaxMemoryMb() {
return getInt(AM_CONTAINER_MAX_MEMORY_MB, DEFAULT_AM_CONTAINER_MAX_MEMORY_MB);
}
public String getAmOpts() {
return get(AM_JVM_OPTIONS, "");
}
public String getQueueName() {
return get(QUEUE_NAME, null);
}
public String getAMJavaHome() {
return get(AM_JAVA_HOME, null);
}
public int getAllocatorSleepTime() {
return getInt(ALLOCATOR_SLEEP_MS, DEFAULT_ALLOCATOR_SLEEP_MS);
}
public int getContainerRequestTimeout() {
return getInt(CONTAINER_REQUEST_TIMEOUT_MS, DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS);
}
public boolean getHostAffinityEnabled() {
return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED);
}
public String getYarnKerberosPrincipal() {
return get(YARN_KERBEROS_PRINCIPAL, null);
}
public String getYarnKerberosKeytab() {
return get(YARN_KERBEROS_KEYTAB, null);
}
public long getYarnTokenRenewalIntervalSeconds() {
return getLong(YARN_TOKEN_RENEWAL_INTERVAL_SECONDS, DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS);
}
public String getYarnCredentialsFile() {
return get(YARN_CREDENTIALS_FILE, null);
}
public String getYarnJobStagingDirectory() {
return get(YARN_JOB_STAGING_DIRECTORY, null);
}
}