blob: f12d49d8fc43b1b122bf4248e0bf85b2d2ebbe3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ActiveRMInfoProto;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jettison.json.JSONException;
import com.google.protobuf.InvalidProtocolBufferException;
public class PhoenixMRJobUtil {
private static final String YARN_LEADER_ELECTION = "/yarn-leader-election";
private static final String ACTIVE_STANDBY_ELECTOR_LOCK = "ActiveStandbyElectorLock";
private static final String RM_APPS_GET_ENDPOINT = "/ws/v1/cluster/apps";
// Reduced HBase Client Retries
private static final int CLIENT_RETRIES_NUMBER = 2;
private static final long CLIENT_PAUSE_TIME = 1000;
private static final int ZOOKEEPER_RECOVERY_RETRY_COUNT = 1;
public static final String PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY =
"phoenix.index.mr.scheduler.capacity.queuename";
public static final String PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY =
"phoenix.index.mr.scheduler.capacity.mapMemoryMB";
// Default MR Capacity Scheduler Configurations for Phoenix MR Index Build
// Jobs
public static final String DEFAULT_QUEUE_NAME = "default";
public static final int DEFAULT_MAP_MEMROY_MB = 5120;
public static final String XMX_OPT = "-Xmx";
public static final String RM_HTTP_SCHEME = "http";
// TODO - Move these as properties?
public static final int RM_CONNECT_TIMEOUT_MILLIS = 10 * 1000;
public static final int RM_READ_TIMEOUT_MILLIS = 10 * 60 * 1000;
private static final Log LOG = LogFactory.getLog(PhoenixMRJobUtil.class);
public static final String PHOENIX_MR_SCHEDULER_TYPE_NAME = "phoenix.index.mr.scheduler.type";
public enum MR_SCHEDULER_TYPE {
CAPACITY, FAIR, NONE
};
public static String getActiveResourceManagerHost(Configuration config, String zkQuorum)
throws IOException, InterruptedException, JSONException, KeeperException,
InvalidProtocolBufferException, ZooKeeperConnectionException {
ZooKeeperWatcher zkw = null;
ZooKeeper zk = null;
String activeRMHost = null;
try {
zkw = new ZooKeeperWatcher(config, "get-active-yarnmanager", null);
zk = new ZooKeeper(zkQuorum, 30000, zkw, false);
List<String> children = zk.getChildren(YARN_LEADER_ELECTION, zkw);
for (String subEntry : children) {
List<String> subChildern =
zk.getChildren(YARN_LEADER_ELECTION + "/" + subEntry, zkw);
for (String eachEntry : subChildern) {
if (eachEntry.contains(ACTIVE_STANDBY_ELECTOR_LOCK)) {
String path =
YARN_LEADER_ELECTION + "/" + subEntry + "/"
+ ACTIVE_STANDBY_ELECTOR_LOCK;
byte[] data = zk.getData(path, zkw, new Stat());
ActiveRMInfoProto proto = ActiveRMInfoProto.parseFrom(data);
proto.getRmId();
LOG.info("Active RmId : " + proto.getRmId());
activeRMHost =
config.get(YarnConfiguration.RM_HOSTNAME + "." + proto.getRmId());
LOG.info("activeResourceManagerHostname = " + activeRMHost);
}
}
}
} finally {
if (zkw != null) zkw.close();
if (zk != null) zk.close();
}
return activeRMHost;
}
public static String getJobsInformationFromRM(String rmhost, int rmport,
Map<String, String> urlParams) throws MalformedURLException, ProtocolException,
UnsupportedEncodingException, IOException {
HttpURLConnection con = null;
String response = null;
String url = null;
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmhost).append(":").append(rmport)
.append(RM_APPS_GET_ENDPOINT);
if (urlParams != null && urlParams.size() != 0) {
urlBuilder.append("?");
for (String key : urlParams.keySet()) {
urlBuilder.append(key + "=" + urlParams.get(key) + "&");
}
urlBuilder.delete(urlBuilder.length() - 1, urlBuilder.length());
}
url = urlBuilder.toString();
LOG.info("Attempt to get running/submitted jobs information from RM URL = " + url);
URL obj = new URL(url);
con = (HttpURLConnection) obj.openConnection();
con.setInstanceFollowRedirects(true);
con.setRequestMethod("GET");
con.setConnectTimeout(RM_CONNECT_TIMEOUT_MILLIS);
con.setReadTimeout(RM_READ_TIMEOUT_MILLIS);
response = getTextContent(con.getInputStream());
} finally {
if (con != null) con.disconnect();
}
LOG.info("Result of attempt to get running/submitted jobs from RM - URL=" + url
+ ",ResponseCode=" + con.getResponseCode() + ",Response=" + response);
return response;
}
public static String getTextContent(InputStream is) throws IOException {
BufferedReader in = null;
StringBuilder response = null;
try {
in = new BufferedReader(new InputStreamReader(is));
String inputLine;
response = new StringBuilder();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine).append("\n");
}
} finally {
if (in != null) in.close();
if (is != null) {
is.close();
}
}
return response.toString();
}
public static void shutdown(ExecutorService pool) throws InterruptedException {
pool.shutdown();
LOG.debug("Shutdown called");
pool.awaitTermination(200, TimeUnit.MILLISECONDS);
LOG.debug("Await termination called to wait for 200 msec");
if (!pool.isShutdown()) {
pool.shutdownNow();
LOG.debug("Await termination called to wait for 200 msec");
pool.awaitTermination(100, TimeUnit.MILLISECONDS);
}
if (!pool.isShutdown()) {
LOG.warn("Pool did not shutdown");
}
}
public static int getRMPort(Configuration conf) throws IOException {
String rmHostPortStr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS);
String[] rmHostPort = rmHostPortStr.split(":");
if (rmHostPort == null || rmHostPort.length != 2) {
throw new IOException("Invalid value for property "
+ YarnConfiguration.RM_WEBAPP_ADDRESS + " = " + rmHostPortStr);
}
int rmPort = Integer.parseInt(rmHostPort[1]);
return rmPort;
}
public static void updateTimeoutsToFailFast(Configuration conf) {
conf.set("hbase.client.retries.number", String.valueOf(CLIENT_RETRIES_NUMBER));
conf.set("zookeeper.recovery.retry", String.valueOf(ZOOKEEPER_RECOVERY_RETRY_COUNT));
conf.set("hbase.client.pause", String.valueOf(CLIENT_PAUSE_TIME));
}
/**
* This method set the configuration values for Capacity scheduler.
* @param conf - Configuration to which Capacity Queue information to be added
*/
public static void updateCapacityQueueInfo(Configuration conf) {
conf.set(MRJobConfig.QUEUE_NAME,
conf.get(PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY, DEFAULT_QUEUE_NAME));
int mapMemoryMB = conf.getInt(PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY, DEFAULT_MAP_MEMROY_MB);
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemoryMB);
conf.set(MRJobConfig.MAP_JAVA_OPTS, XMX_OPT + ((int) (mapMemoryMB * 0.9)) + "m");
LOG.info("Queue Name=" + conf.get(MRJobConfig.QUEUE_NAME) + ";" + "Map Meory MB="
+ conf.get(MRJobConfig.MAP_MEMORY_MB) + ";" + "Map Java Opts="
+ conf.get(MRJobConfig.MAP_JAVA_OPTS));
}
}