blob: c116ed83f8b424bc7353e89c60cf165da542fbec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class SamzaAppState {
/**
* Represents an invalid or unknown Samza container ID.
*/
private static final int UNUSED_CONTAINER_ID = -1;
/**
* Job Coordinator is started in the AM and follows the {@link org.apache.samza.job.yarn.SamzaAppMasterService}
* lifecycle. It helps querying JobModel related info in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
* and locality information when host-affinity is enabled in {@link org.apache.samza.job.yarn.SamzaTaskManager}
*/
public final JobModelManager jobCoordinator;
/* The following state variables are primarily used for reference in the AM web services */
/**
* Task Id of the AM
* Used for displaying in the AM UI. Usage found in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
* and scalate/WEB-INF/views/index.scaml
*/
public final int taskId;
/**
* Id of the AM container (as allocated by the RM)
* Used for displaying in the AM UI. Usage in {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
* and scalate/WEB-INF/views/index.scaml
*/
public final ContainerId amContainerId;
/**
* Host name of the NM on which the AM is running
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
*/
public final String nodeHost;
/**
* NM port on which the AM is running
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
*/
public final int nodePort;
/**
* Http port of the NM on which the AM is running
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
*/
public final int nodeHttpPort;
/**
* Application Attempt Id as provided by Yarn
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
* and {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
*/
public final ApplicationAttemptId appAttemptId;
/**
* JMX Server URL, if enabled
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
*/
public String jmxUrl = "";
/**
* JMX Server Tunneling URL, if enabled
* Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml
*/
public String jmxTunnelingUrl = "";
/**
* Job Coordinator URL
* Usage in {@link org.apache.samza.job.yarn.SamzaAppMasterService} & ContainerUtil
*/
public URL coordinatorUrl = null;
/**
* URL of the {@link org.apache.samza.webapp.ApplicationMasterRestServlet}
*/
public URL rpcUrl = null;
/**
* URL of the {@link org.apache.samza.webapp.ApplicationMasterWebServlet}
*/
public URL trackingUrl = null;
/**
* The following state variables are required for the correct functioning of the TaskManager
* Some of them are shared between the AMRMCallbackThread and the ContainerAllocator thread, as mentioned below.
*/
/**
* Number of containers that have completed their execution and exited successfully
*/
public AtomicInteger completedContainers = new AtomicInteger(0);
/**
* Number of failed containers
* */
public AtomicInteger failedContainers = new AtomicInteger(0);
/**
* Number of containers released due to extra allocation returned by the RM
*/
public AtomicInteger releasedContainers = new AtomicInteger(0);
/**
* ContainerStatus of failed containers.
*/
public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
/**
* Number of containers configured for the job
*/
public int containerCount = 0;
/**
* Set of finished containers - TODO: Can be changed to a counter
*/
public Set<Integer> finishedContainers = new HashSet<Integer>();
/**
* Number of containers needed for the job to be declared healthy
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
public AtomicInteger neededContainers = new AtomicInteger(0);
/**
* Map of the samzaContainerId to the {@link org.apache.samza.job.yarn.YarnContainer} on which it is running
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
public ConcurrentMap<Integer, YarnContainer> runningContainers = new ConcurrentHashMap<Integer, YarnContainer>(0);
/**
* Final status of the application
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
public FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
/**
* State indicating whether the job is healthy or not
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
public AtomicBoolean jobHealthy = new AtomicBoolean(true);
public AtomicInteger containerRequests = new AtomicInteger(0);
public AtomicInteger matchedContainerRequests = new AtomicInteger(0);
public SamzaAppState(JobModelManager jobModelManager,
int taskId,
ContainerId amContainerId,
String nodeHost,
int nodePort,
int nodeHttpPort) {
this.jobCoordinator = jobModelManager;
this.taskId = taskId;
this.amContainerId = amContainerId;
this.nodeHost = nodeHost;
this.nodePort = nodePort;
this.nodeHttpPort = nodeHttpPort;
this.appAttemptId = amContainerId.getApplicationAttemptId();
}
/**
* Returns the Samza container ID if the specified YARN container ID corresponds to a running container.
*
* @param yarnContainerId the YARN container ID.
* @return the Samza container ID if it is running,
* otherwise {@link SamzaAppState#UNUSED_CONTAINER_ID}.
*/
public int getRunningSamzaContainerId(ContainerId yarnContainerId) {
int containerId = UNUSED_CONTAINER_ID;
for(Map.Entry<Integer, YarnContainer> entry: runningContainers.entrySet()) {
if(entry.getValue().id().equals(yarnContainerId)) {
containerId = entry.getKey();
break;
}
}
return containerId;
}
/**
* @param samzaContainerId the Samza container ID to validate.
* @return {@code true} if the ID is valid, {@code false} otherwise
*/
public static boolean isValidContainerId(int samzaContainerId) {
return samzaContainerId != UNUSED_CONTAINER_ID;
}
}