blob: c1ddb5966424dca441e04d1983656db0a3e81870 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.utils;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.Supervisor;
import org.apache.storm.security.auth.ThriftClient;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Client for interacting with Supervisor server, now we use supervisor server mainly for cases below.
* <ul>
* <li>worker <- supervisor: get worker local assignment for a storm.</li>
* <li>nimbus -> supervisor: assign assignments for a node.</li>
* </ul>
*/
public class SupervisorClient extends ThriftClient implements SupervisorIfaceFactory {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class);
private Supervisor.Client client;
public SupervisorClient(Map conf, String host, int port) throws TTransportException {
this(conf, host, port, null, null);
}
public SupervisorClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
super(conf, ThriftConnectionType.SUPERVISOR, host, port, timeout, null);
client = new Supervisor.Client(protocol);
}
public SupervisorClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
super(conf, ThriftConnectionType.SUPERVISOR, host, port, timeout, asUser);
client = new Supervisor.Client(protocol);
}
public SupervisorClient(Map conf, String host) throws TTransportException {
super(conf, ThriftConnectionType.SUPERVISOR, host, null, null, null);
client = new Supervisor.Client(protocol);
}
public static SupervisorClient getConfiguredClient(Map conf, String host) {
//use the default server port.
int port = Integer.parseInt(conf.get(Config.SUPERVISOR_THRIFT_PORT).toString());
return getConfiguredClientAs(conf, host, port, null);
}
public static SupervisorClient getConfiguredClient(Map conf, String host, int port) {
return getConfiguredClientAs(conf, host, port, null);
}
public static SupervisorClient getConfiguredClientAs(Map conf, String host, int port, String asUser) {
if (conf.containsKey(Config.STORM_DO_AS_USER)) {
if (asUser != null && !asUser.isEmpty()) {
LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.",
asUser,
conf.get(Config.STORM_DO_AS_USER));
}
asUser = (String) conf.get(Config.STORM_DO_AS_USER);
}
try {
return new SupervisorClient(conf, host, port, null, asUser);
} catch (TTransportException e) {
throw new RuntimeException("Failed to create a supervisor client for host " + host);
}
}
@Override
public Supervisor.Client getIface() {
return client;
}
}