blob: bc4e8d64de574e375f80d1b000b33c2a485063ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.appMaster.http;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.yarn.appMaster.ClusterController;
import org.apache.drill.yarn.appMaster.ClusterControllerImpl;
import org.apache.drill.yarn.appMaster.ControllerVisitor;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.Task.TrackingState;
import org.apache.drill.yarn.appMaster.TaskState;
import org.apache.drill.yarn.appMaster.TaskVisitor;
import org.apache.drill.yarn.core.DoYUtil;
import org.apache.drill.yarn.core.DrillOnYarnConfig;
import org.apache.drill.yarn.zk.ZKRegistry;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
public abstract class AbstractTasksModel {
public static class TaskModel {
public int id;
protected String groupName;
protected boolean isLive;
protected TaskState taskState;
protected String taskStateHint;
protected String state;
protected boolean cancelled;
protected String trackingState;
protected String trackingStateHint;
protected Container container;
protected DrillbitEndpoint endpoint;
protected long startTime;
protected int memoryMb;
protected int vcores;
protected double disks;
protected String containerId;
protected String nmLink;
protected long endTime;
protected String disposition;
protected int tryCount;
private Map<TaskState,String> stateHints = makeStateHints( );
private Map<TrackingState,String> trackingStateHints = makeTrackingStateHints( );
public TaskModel(Task task, boolean live) {
id = task.taskId;
groupName = task.scheduler.getName();
taskState = task.getState();
taskStateHint = stateHints.get(taskState);
state = taskState.getLabel();
cancelled = task.isCancelled();
isLive = live && taskState == TaskState.RUNNING;
TrackingState tState = task.getTrackingState();
trackingState = tState.getDisplayName();
trackingStateHint = trackingStateHints.get(tState);
container = task.container;
startTime = task.launchTime;
if (task.container != null) {
containerId = task.container.getId().toString();
Resource resource = task.container.getResource();
memoryMb = resource.getMemory();
vcores = resource.getVirtualCores();
disks = task.getContainerSpec().disks;
// Emulate the NM link. Used for debugging, gets us to
// the page on the NM UI for this container so we can see
// logs, etc.
nmLink = "http://" + task.container.getNodeHttpAddress();
} else {
memoryMb = task.scheduler.getResource().memoryMb;
vcores = task.scheduler.getResource().vCores;
}
endpoint = (DrillbitEndpoint) task.properties
.get(ZKRegistry.ENDPOINT_PROPERTY);
if (!live) {
endTime = task.completionTime;
tryCount = task.tryCount;
// Determine disposition from most general to most
// specific sources of information.
disposition = state;
if (task.disposition != null) {
disposition = task.disposition.toString();
}
if (task.completionStatus != null) {
disposition = reformatDiagnostics( task.completionStatus.getDiagnostics() );
}
if (task.error != null) {
disposition = task.error.getMessage();
}
}
}
private enum FormatState { PRE_STACK, IN_STACK, POST_STACK };
/**
* YARN diagnostics are verbose: they contain a stack trace of the YARN node
* manager thread (not Drill), and contain blank lines, the container ID,
* etc. Remove unnecessary cruft to make the diagnostics simpler and smaller
* in the Web UI.
*
* @param orig YARN diagnostics
* @return cleaned-up version.
*/
public static String reformatDiagnostics( String orig ) {
try {
StringBuilder buf = new StringBuilder( );
BufferedReader reader = new BufferedReader( new StringReader( orig ) );
String line;
FormatState state = FormatState.PRE_STACK;
while ( (line = reader.readLine()) != null ) {
switch( state ) {
case PRE_STACK:
if ( line.startsWith( "Container id:") ) {
continue;
}
if ( line.startsWith( "Stack trace:" ) ) {
state = FormatState.IN_STACK;
continue;
}
break;
case IN_STACK:
if ( line.trim().isEmpty() ) {
state = FormatState.POST_STACK;
}
continue;
case POST_STACK:
default:
break;
}
if ( line.trim().isEmpty() ) {
continue;
}
buf.append( line );
buf.append( "<br/>\n" );
}
buf.append( "See log file for details." );
return buf.toString();
} catch (IOException e) {
// Will never occur. But, if the impossible happens, just return
// the original diagnostics.
return orig.replace("\n", "<br>\n");
}
}
private Map<TaskState, String> makeStateHints() {
Map<TaskState, String> hints = new HashMap<>();
hints.put(TaskState.START, "Queued to send a container request to YARN.");
hints.put(TaskState.REQUESTING, "Container request sent to YARN.");
hints.put(TaskState.LAUNCHING,
"YARN provided a container, send launch request.");
hints.put(TaskState.WAIT_START_ACK,
"Drillbit launched, waiting for ZooKeeper registration.");
hints.put(TaskState.RUNNING, "Drillbit is running normally.");
hints.put(TaskState.ENDING,
"Graceful shutdown request sent to the Drillbit.");
hints.put(TaskState.KILLING,
"Sent the YARN Node Manager a request to forcefully kill the Drillbit.");
hints.put(TaskState.WAIT_END_ACK,
"Drillbit has shut down; waiting for ZooKeeper to confirm.");
// The UI will never display the END state.
hints.put(TaskState.END, "The Drillbit has shut down.");
return hints;
}
private Map<TrackingState, String> makeTrackingStateHints() {
Map<TrackingState, String> hints = new HashMap<>();
// UNTRACKED state is not used by Drillbits.
hints.put(TrackingState.UNTRACKED, "Task is not tracked in ZooKeeper.");
hints.put(TrackingState.NEW,
"Drillbit has not yet registered with ZooKeeper.");
hints.put(TrackingState.START_ACK,
"Drillbit has registered normally with ZooKeeper.");
hints.put(TrackingState.END_ACK,
"Drillbit is no longer registered with ZooKeeper.");
return hints;
}
public String getTaskId() {
return Integer.toString(id);
}
public String getGroupName( ) { return groupName; }
public boolean isLive( ) {
return isLive;
}
public String getHost( ) {
if ( container == null ) {
return ""; }
return container.getNodeId().getHost();
}
public String getLink( ) {
if ( endpoint == null ) {
return ""; }
String port = DrillOnYarnConfig.config( ).getString( DrillOnYarnConfig.DRILLBIT_HTTP_PORT );
String protocol = "http:";
if ( DrillOnYarnConfig.config().getBoolean( DrillOnYarnConfig.DRILLBIT_USE_HTTPS ) ) {
protocol = "https:";
}
return protocol + "//" + endpoint.getAddress() + ":" + port + "/";
}
public String getState( ) { return state.toString(); }
public String getStateHint( ) { return taskStateHint; }
public boolean isCancelled( ) { return cancelled; }
public boolean isCancellable( ) {
return ! cancelled && taskState.isCancellable( );
}
public String getTrackingState( ) { return trackingState; }
public String getTrackingStateHint( ) { return trackingStateHint; }
public String getStartTime( ) {
if ( startTime == 0 ) {
return ""; }
return DoYUtil.toIsoTime( startTime );
}
public int getMemory( ) { return memoryMb; }
public int getVcores( ) { return vcores; }
public String getDisks( ) {
return String.format( "%.2f", disks );
}
public boolean hasContainer( ) { return containerId != null; }
public String getContainerId( ) { return displayString( containerId ); }
public String getNmLink( ) { return displayString( nmLink ); }
public String getDisposition( ) { return displayString( disposition ); }
public int getTryCount( ) { return tryCount; }
public String displayString( String value ) { return (value == null) ? "" : value; }
public String getEndTime( ) {
if ( endTime == 0 ) {
return ""; }
return DoYUtil.toIsoTime( endTime );
}
}
public static class UnmanagedDrillbitModel
{
protected String host;
protected String ports;
public UnmanagedDrillbitModel( String endpoint ) {
String parts[] = endpoint.split( ":" );
if ( parts.length < 4 ) {
// Should never occur, but better save than sorry.
host = endpoint;
ports = "";
}
else {
host = parts[0];
List<String> thePorts = new ArrayList<>( );
thePorts.add( parts[1] );
thePorts.add( parts[2] );
thePorts.add( parts[3] );
ports = DoYUtil.join( ", ", thePorts );
}
}
public String getHost( ) { return host; }
public String getPorts( ) { return ports; }
}
protected boolean supportsDisks;
protected List<TaskModel> results = new ArrayList<>( );
public List<TaskModel> getTasks( ) { return results; }
public boolean hasTasks( ) { return ! results.isEmpty(); }
public boolean supportsDiskResource( ) { return supportsDisks; }
public static class TasksModel extends AbstractTasksModel implements TaskVisitor
{
protected List<UnmanagedDrillbitModel> unmanaged;
protected List<String> blacklist;
@Override
public void visit(Task task) {
results.add( new TaskModel( task, true ) );
}
/**
* Sort tasks by Task ID.
*/
public void sortTasks() {
Collections.sort( results, new Comparator<TaskModel>( ) {
@Override
public int compare(TaskModel t1, TaskModel t2) {
return Integer.compare( t1.id, t2.id );
}
});
}
/**
* List any anomalies: either stray Drillbits (those in ZK but not launched by DoY),
* or blacklisted nodes.
* <p>
* To avoid race conditions, do not use the controller visitor to invoke this method,
* we want to leave the controller unlocked and instead lock only the ZK registry.
*
* @param controller
*/
public void listAnomalies(ClusterController controller) {
listUnmanaged(controller);
synchronized( controller ) {
blacklist = ((ClusterControllerImpl) controller).getNodeInventory().getBlacklist();
}
Collections.sort( blacklist );
}
private void listUnmanaged(ClusterController controller) {
ZKRegistry zkRegistry = (ZKRegistry) controller.getProperty( ZKRegistry.CONTROLLER_PROPERTY );
if ( zkRegistry == null ) {
return;
}
List<String> endpoints = zkRegistry.listUnmanagedDrillits( );
if ( endpoints.isEmpty() ) {
return; }
unmanaged = new ArrayList<>( );
for ( String endpoint : endpoints ) {
unmanaged.add( new UnmanagedDrillbitModel( endpoint ) );
}
}
public List<UnmanagedDrillbitModel>getUnnamaged( ) { return unmanaged; }
public boolean hasUnmanagedDrillbits( ) { return unmanaged != null; }
public int getUnmanagedDrillbitCount( ) {
return (unmanaged == null) ? 0 : unmanaged.size( );
}
public boolean hasBlacklist( ) { return ! blacklist.isEmpty(); }
public int getBlacklistCount( ) { return blacklist.size( ); }
public List<String> getBlacklist( ) { return blacklist; }
}
public static class HistoryModel extends AbstractTasksModel implements ControllerVisitor
{
@Override
public void visit(ClusterController controller) {
ClusterControllerImpl impl = (ClusterControllerImpl) controller;
for ( Task task : impl.getHistory( ) ) {
results.add( new TaskModel( task, false ) );
}
}
}
}