blob: 69dcb3b5107f5e5f29dbd0baa70827b69f2eacf7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.client.ipc;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.api.StateValues;
import org.apache.slider.api.proto.Messages;
import static org.apache.slider.api.proto.RestTypeMarshalling.*;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.api.types.PingInformation;
import org.apache.slider.common.tools.Duration;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.WaitTimeoutException;
import org.apache.slider.core.persist.ConfTreeSerDeser;
import org.apache.slider.server.services.security.SecurityStore;
import org.apache.slider.server.services.security.SignCertResponse;
import org.codehaus.jackson.JsonParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Cluster operations at a slightly higher level than the RPC code
*/
public class SliderClusterOperations {
protected static final Logger
log = LoggerFactory.getLogger(SliderClusterOperations.class);
private final SliderClusterProtocol appMaster;
private static final Messages.EmptyPayloadProto EMPTY;
static {
EMPTY = Messages.EmptyPayloadProto.newBuilder().build();
}
public SliderClusterOperations(SliderClusterProtocol appMaster) {
this.appMaster = appMaster;
}
@Override
public String toString() {
final StringBuilder sb =
new StringBuilder("SliderClusterOperations{");
sb.append("IPC binding=").append(appMaster);
sb.append('}');
return sb.toString();
}
/**
* Get a node from the AM
* @param uuid uuid of node
* @return deserialized node
* @throws IOException IO problems
* @throws NoSuchNodeException if the node isn't found
*/
public ClusterNode getNode(String uuid)
throws IOException, NoSuchNodeException, YarnException {
Messages.GetNodeRequestProto req =
Messages.GetNodeRequestProto.newBuilder().setUuid(uuid).build();
Messages.GetNodeResponseProto node = appMaster.getNode(req);
return ClusterNode.fromProtobuf(node.getClusterNode());
}
/**
* Unmarshall a list of nodes from a protobud response
* @param nodes node list
* @return possibly empty list of cluster nodes
* @throws IOException
*/
public List<ClusterNode> convertNodeWireToClusterNodes(List<Messages.RoleInstanceState> nodes)
throws IOException {
List<ClusterNode> nodeList = new ArrayList<ClusterNode>(nodes.size());
for (Messages.RoleInstanceState node : nodes) {
nodeList.add(ClusterNode.fromProtobuf(node));
}
return nodeList;
}
/**
* Echo text (debug action)
* @param text text
* @return the text, echoed back
* @throws YarnException
* @throws IOException
*/
public String echo(String text) throws
YarnException,
IOException {
Messages.EchoRequestProto.Builder builder =
Messages.EchoRequestProto.newBuilder();
builder.setText(text);
Messages.EchoRequestProto req =
builder.build();
Messages.EchoResponseProto response =
appMaster.echo(req);
return response.getText();
}
/**
* Connect to a live cluster and get its current state
* @return its description
*/
public ClusterDescription getClusterDescription()
throws YarnException, IOException {
Messages.GetJSONClusterStatusRequestProto req =
Messages.GetJSONClusterStatusRequestProto.newBuilder().build();
Messages.GetJSONClusterStatusResponseProto resp =
appMaster.getJSONClusterStatus(req);
String statusJson = resp.getClusterSpec();
try {
return ClusterDescription.fromJson(statusJson);
} catch (JsonParseException e) {
log.error(
"Exception " + e + " parsing:\n" + statusJson,
e);
throw e;
}
}
/**
* Get the AM instance definition.
* <p>
* See {@link SliderClusterProtocol#getInstanceDefinition(Messages.GetInstanceDefinitionRequestProto)}
* @return current slider AM aggregate definition
* @throws YarnException
* @throws IOException
*/
public AggregateConf getInstanceDefinition()
throws YarnException, IOException {
Messages.GetInstanceDefinitionRequestProto.Builder builder =
Messages.GetInstanceDefinitionRequestProto.newBuilder();
Messages.GetInstanceDefinitionRequestProto request = builder.build();
Messages.GetInstanceDefinitionResponseProto response =
appMaster.getInstanceDefinition(request);
ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
ConfTree internal = confTreeSerDeser.fromJson(response.getInternal());
ConfTree resources = confTreeSerDeser.fromJson(response.getResources());
ConfTree app = confTreeSerDeser.fromJson(response.getApplication());
AggregateConf instanceDefinition =
new AggregateConf(resources, app, internal);
return instanceDefinition;
}
/**
* Kill a container
* @param id container ID
* @return a success flag
* @throws YarnException
* @throws IOException
*/
public boolean killContainer(String id) throws
YarnException,
IOException {
Messages.KillContainerRequestProto.Builder builder =
Messages.KillContainerRequestProto.newBuilder();
builder.setId(id);
Messages.KillContainerRequestProto req =
builder.build();
Messages.KillContainerResponseProto response =
appMaster.killContainer(req);
return response.getSuccess();
}
/**
* List all node UUIDs in a role
* @param role role name or "" for all
* @return an array of UUID strings
* @throws IOException
* @throws YarnException
*/
public String[] listNodeUUIDsByRole(String role) throws
IOException,
YarnException {
Collection<String> uuidList = innerListNodeUUIDSByRole(role);
String[] uuids = new String[uuidList.size()];
return uuidList.toArray(uuids);
}
public List<String> innerListNodeUUIDSByRole(String role) throws
IOException,
YarnException {
Messages.ListNodeUUIDsByRoleRequestProto req =
Messages.ListNodeUUIDsByRoleRequestProto
.newBuilder()
.setRole(role)
.build();
Messages.ListNodeUUIDsByRoleResponseProto resp =
appMaster.listNodeUUIDsByRole(req);
return resp.getUuidList();
}
/**
* List all nodes in a role. This is a double round trip: once to list
* the nodes in a role, another to get their details
* @param role
* @return an array of ContainerNode instances
* @throws IOException
* @throws YarnException
*/
public List<ClusterNode> listClusterNodesInRole(String role) throws
IOException,
YarnException {
Collection<String> uuidList = innerListNodeUUIDSByRole(role);
Messages.GetClusterNodesRequestProto req =
Messages.GetClusterNodesRequestProto
.newBuilder()
.addAllUuid(uuidList)
.build();
Messages.GetClusterNodesResponseProto resp = appMaster.getClusterNodes(req);
return convertNodeWireToClusterNodes(resp.getClusterNodeList());
}
/**
* Get the details on a list of uuids
* @param uuids
* @return a possibly empty list of node details
* @throws IOException
* @throws YarnException
*/
@VisibleForTesting
public List<ClusterNode> listClusterNodes(String[] uuids) throws
IOException,
YarnException {
Messages.GetClusterNodesRequestProto req =
Messages.GetClusterNodesRequestProto
.newBuilder()
.addAllUuid(Arrays.asList(uuids))
.build();
Messages.GetClusterNodesResponseProto resp = appMaster.getClusterNodes(req);
return convertNodeWireToClusterNodes(resp.getClusterNodeList());
}
/**
* Wait for an instance of a named role to be live (or past it in the lifecycle)
* @param role role to look for
* @param timeout time to wait
* @return the state. If still in CREATED, the cluster didn't come up
* in the time period. If LIVE, all is well. If >LIVE, it has shut for a reason
* @throws IOException IO
* @throws SliderException Slider
* @throws WaitTimeoutException if the wait timed out
*/
@VisibleForTesting
public int waitForRoleInstanceLive(String role, long timeout)
throws WaitTimeoutException, IOException, YarnException {
Duration duration = new Duration(timeout);
duration.start();
boolean live = false;
int state = StateValues.STATE_CREATED;
log.info("Waiting {} millis for a live node in role {}", timeout, role);
try {
while (!live) {
// see if there is a node in that role yet
List<String> uuids = innerListNodeUUIDSByRole(role);
String[] containers = uuids.toArray(new String[uuids.size()]);
int roleCount = containers.length;
ClusterNode roleInstance = null;
if (roleCount != 0) {
// if there is, get the node
roleInstance = getNode(containers[0]);
if (roleInstance != null) {
state = roleInstance.state;
live = state >= StateValues.STATE_LIVE;
}
}
if (!live) {
if (duration.getLimitExceeded()) {
throw new WaitTimeoutException(
String.format("Timeout after %d millis" +
" waiting for a live instance of type %s; " +
"instances found %d %s",
timeout, role, roleCount,
(roleInstance != null
? (" instance -\n" + roleInstance.toString())
: "")
));
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// ignored
}
}
}
}
} finally {
duration.close();
}
return state;
}
/**
* Flex operation
* @param resources new resources
* @return the response
* @throws IOException
*/
public boolean flex(ConfTree resources) throws IOException {
Messages.FlexClusterRequestProto request =
Messages.FlexClusterRequestProto.newBuilder()
.setClusterSpec(resources.toJson())
.build();
Messages.FlexClusterResponseProto response =
appMaster.flexCluster(request);
return response.getResponse();
}
/**
* Commit (possibly delayed) AM suicide
*
* @param signal exit code
* @param text text text to log
* @param delay delay in millis
* @throws YarnException
* @throws IOException
*/
public void amSuicide(String text, int signal, int delay)
throws IOException {
Messages.AMSuicideRequestProto.Builder builder =
Messages.AMSuicideRequestProto.newBuilder();
if (text != null) {
builder.setText(text);
}
builder.setSignal(signal);
builder.setDelay(delay);
Messages.AMSuicideRequestProto req =
builder.build();
Messages.AMSuicideResponseProto response =
appMaster.amSuicide(req);
}
/**
* Get the application liveness
* @return current liveness information
* @throws IOException
*/
public ApplicationLivenessInformation getLivenessInformation() throws IOException {
Messages.GetApplicationLivenessRequestProto.Builder builder =
Messages.GetApplicationLivenessRequestProto.newBuilder();
Messages.ApplicationLivenessInformationProto wire =
appMaster.getLivenessInformation(builder.build());
return unmarshall(wire);
}
public AggregateConf getModelDesired() throws IOException {
return unmarshallToAggregateConf(appMaster.getModelDesired(EMPTY));
}
public ConfTreeOperations getModelDesiredAppconf() throws IOException {
return unmarshallToCTO(
appMaster.getModelDesiredAppconf(EMPTY));
}
public ConfTreeOperations getModelDesiredResources() throws IOException {
return unmarshallToCTO(
appMaster.getModelDesiredResources(EMPTY));
}
public AggregateConf getModelResolved() throws IOException {
return unmarshallToAggregateConf(
appMaster.getModelResolved(EMPTY));
}
public ConfTreeOperations getModelResolvedAppconf() throws IOException {
return unmarshallToCTO(
appMaster.getModelResolvedAppconf(EMPTY));
}
public ConfTreeOperations getModelResolvedResources() throws IOException {
return unmarshallToCTO(
appMaster.getModelDesiredResources(EMPTY));
}
public ConfTreeOperations getLiveResources() throws IOException {
return unmarshallToCTO(
appMaster.getLiveResources(EMPTY));
}
public Map<String, ContainerInformation> enumContainers() throws IOException {
Messages.GetLiveContainersResponseProto response =
appMaster.getLiveContainers(
Messages.GetLiveContainersRequestProto.newBuilder().build());
int namesCount = response.getNamesCount();
int records = response.getContainersCount();
if (namesCount != records) {
throw new IOException("Number of names returned (" + namesCount
+ ") does not match the number of records returned: "
+ records);
}
Map<String, ContainerInformation> map =
new HashMap<String, ContainerInformation>(namesCount);
for (int i = 0; i < namesCount; i++) {
map.put(response.getNames(i), unmarshall(response.getContainers(i)));
}
return map;
}
public ContainerInformation getContainer(String containerId) throws
IOException {
Messages.ContainerInformationProto response =
appMaster.getLiveContainer(
Messages.GetLiveContainerRequestProto.newBuilder()
.setContainerId(containerId)
.build());
return unmarshall(response);
}
public List<ContainerInformation> getContainers() throws IOException {
Messages.GetLiveContainersResponseProto response = appMaster
.getLiveContainers(Messages.GetLiveContainersRequestProto.newBuilder()
.build());
return unmarshall(response);
}
public Map<String, ComponentInformation> enumComponents() throws IOException {
Messages.GetLiveComponentsResponseProto response =
appMaster.getLiveComponents(
Messages.GetLiveComponentsRequestProto.newBuilder().build());
int namesCount = response.getNamesCount();
int records = response.getComponentsCount();
if (namesCount != records) {
throw new IOException("Number of names returned (" + namesCount
+
") does not match the number of records returned: "
+ records);
}
Map<String, ComponentInformation> map =
new HashMap<String, ComponentInformation>(namesCount);
for (int i = 0; i < namesCount; i++) {
map.put(response.getNames(i), unmarshall(response.getComponents(i)));
}
return map;
}
public ComponentInformation getComponent(String componentName)
throws IOException {
Messages.GetLiveComponentRequestProto.Builder builder =
Messages.GetLiveComponentRequestProto.newBuilder();
builder.setName(componentName);
Messages.ComponentInformationProto proto =
appMaster.getLiveComponent(builder.build());
return unmarshall(proto);
}
public PingInformation ping(String text) throws IOException {
return null;
}
public void stop(String text) throws IOException {
amSuicide(text, 3, 0);
}
public ApplicationLivenessInformation getApplicationLiveness() throws
IOException {
Messages.ApplicationLivenessInformationProto proto =
appMaster.getLivenessInformation(
Messages.GetApplicationLivenessRequestProto.newBuilder().build()
);
return unmarshall(proto);
}
public byte[] getClientCertificateStore(String hostname, String clientId,
String password, String type) throws IOException {
Messages.GetCertificateStoreRequestProto.Builder
builder = Messages.GetCertificateStoreRequestProto.newBuilder();
if (hostname != null) {
builder.setHostname(hostname);
}
Messages.GetCertificateStoreRequestProto requestProto =
builder.setRequesterId(clientId)
.setPassword(password)
.setType(type)
.build();
Messages.GetCertificateStoreResponseProto response =
appMaster.getClientCertificateStore(requestProto);
return unmarshall(response);
}
}