blob: 954b4f81b695fe085f30792bfb710717d3a3cbab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.security.auth;
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import javax.security.auth.login.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.cluster.Cluster;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
public class ThriftClient {
private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
private TTransport _transport;
protected TProtocol _protocol;
private String hostPort;
private String host;
private Integer port;
private Map<Object, Object> conf;
private Integer timeout;
private ThriftConnectionType type;
private String asUser;
public ThriftClient(Map conf, ThriftConnectionType type) throws Exception {
this(conf, type, null, null, null, null);
}
@SuppressWarnings("unchecked")
public ThriftClient(Map conf, ThriftConnectionType type, Integer timeout) throws Exception {
this(conf, type, null, null, timeout, null);
}
/**
* This is only for be compatible for Storm
*
* @param conf
* @param type
* @param host
*/
public ThriftClient(Map conf, ThriftConnectionType type, String host) {
this(conf, type, host, null, null, null);
}
public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout) {
this(conf, type, host, port, timeout, null);
}
public ThriftClient(Map conf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) {
// create a socket with server
this.timeout = timeout;
this.conf = conf;
this.type = type;
this.asUser = asUser;
getMaster(conf, host, port);
reconnect();
}
public static String getMasterByZk(Map conf) throws Exception {
CuratorFramework zkobj = null;
String masterHost = null;
try {
String root = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
String zkMasterDir = root + Cluster.MASTER_SUBTREE;
zkobj = Utils.newCurator(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), zkMasterDir);
zkobj.start();
if (zkobj.checkExists().forPath("/") == null) {
throw new RuntimeException("No alive nimbus ");
}
masterHost = new String(zkobj.getData().forPath("/"));
LOG.info("masterHost:" + masterHost);
return masterHost;
} finally {
if (zkobj != null) {
zkobj.close();
zkobj = null;
}
}
}
public void getMaster(Map conf, String host, Integer port) {
if (StringUtils.isBlank(host) == false) {
this.host = host;
if (port == null) {
port = type.getPort(conf);
}
this.port = port;
this.hostPort = host + ":" + port;
} else {
try {
hostPort = ThriftClient.getMasterByZk(conf);
} catch (Exception e) {
// TODO Auto-generated catch block
throw new RuntimeException("Failed to get master from ZK.", e);
}
String[] host_port = hostPort.split(":");
if (host_port.length != 2) {
throw new InvalidParameterException("Host format error: " + hostPort);
}
this.host = host_port[0];
this.port = Integer.parseInt(host_port[1]);
}
// create a socket with server
if (this.host == null) {
throw new IllegalArgumentException("host is not set");
}
if (this.port == null || this.port <= 0) {
throw new IllegalArgumentException("invalid port: " + port);
}
}
public synchronized TTransport transport() {
return _transport;
}
public synchronized void reconnect() {
close();
try {
TSocket socket = new TSocket(host, port);
if (timeout != null) {
socket.setTimeout(timeout);
} else {
// @@@ Todo
// set the socket default Timeout as xxxx
}
// locate login configuration
Configuration login_conf = AuthUtils.GetConfiguration(conf);
// construct a transport plugin
ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(type, conf, login_conf);
final TTransport underlyingTransport = socket;
// TODO get this from type instead of hardcoding to Nimbus.
// establish client-server transport via plugin
// do retries if the connect fails
TBackoffConnect connectionRetry =
new TBackoffConnect(Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
_transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, host, asUser);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
_protocol = null;
if (_transport != null) {
_protocol = new TBinaryProtocol(_transport);
}
}
public synchronized void close() {
if (_transport != null) {
_transport.close();
_transport = null;
_protocol = null;
}
}
}