| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.tez.dag.app; |
| |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.tez.dag.app.dag.Vertex; |
| |
| public class ContainerContext { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ContainerContext.class); |
| |
| private final Map<String, LocalResource> localResources; |
| private final Credentials credentials; |
| private final Map<String, String> environment; |
| private final String javaOpts; |
| private final Vertex vertex; |
| |
| // FIXME Add support for service meta data comparisons |
| |
| public ContainerContext(Map<String, LocalResource> localResources, |
| Credentials credentials, Map<String, String> environment, String javaOpts) { |
| Objects.requireNonNull(localResources, |
| "localResources should not be null"); |
| Objects.requireNonNull(credentials, "credentials should not be null"); |
| Objects.requireNonNull(environment, "environment should not be null"); |
| Objects.requireNonNull(javaOpts, "javaOpts should not be null"); |
| this.localResources = localResources; |
| this.credentials = credentials; |
| this.environment = environment; |
| this.javaOpts = javaOpts; |
| this.vertex = null; |
| } |
| |
| public ContainerContext(Map<String, LocalResource> localResources, |
| Credentials credentials, Map<String, String> environment, String javaOpts, |
| @Nullable Vertex vertex) { |
| Objects.requireNonNull(localResources, |
| "localResources should not be null"); |
| Objects.requireNonNull(credentials, "credentials should not be null"); |
| Objects.requireNonNull(environment, "environment should not be null"); |
| Objects.requireNonNull(javaOpts, "javaOpts should not be null"); |
| this.localResources = localResources; |
| this.credentials = credentials; |
| this.environment = environment; |
| this.javaOpts = javaOpts; |
| this.vertex = vertex; |
| } |
| |
| public Map<String, LocalResource> getLocalResources() { |
| return this.localResources; |
| } |
| |
| public Credentials getCredentials() { |
| return this.credentials; |
| } |
| |
| public Map<String, String> getEnvironment() { |
| return this.environment; |
| } |
| |
| public String getJavaOpts() { |
| return this.javaOpts; |
| } |
| |
| /** |
| * @return true if this ContainerContext is a super-set of the specified |
| * container context. |
| */ |
| public boolean isSuperSet(ContainerContext otherContext) { |
| Objects.requireNonNull(otherContext, "otherContext should not null"); |
| // Assumptions: |
| // Credentials are the same for all containers belonging to a DAG. |
| // Matching can be added if containers are used across DAGs |
| |
| // Match javaOpts |
| if (!this.javaOpts.equals(otherContext.javaOpts)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Incompatible java opts, " |
| + ", this=" + this.javaOpts |
| + ", other=" + otherContext.javaOpts); |
| } |
| return false; |
| } |
| |
| return isSuperSet(this.environment, otherContext.getEnvironment(), "Environment") |
| && localResourcesCompatible(this.localResources, otherContext.getLocalResources()); |
| } |
| |
| /** |
| * @return true if this ContainerContext is an exact match of the specified |
| * container context. |
| */ |
| public boolean isExactMatch(ContainerContext otherContext) { |
| return (this.vertex == otherContext.vertex); |
| } |
| |
| // TODO Once LRs are handled via YARN, remove this check - and ensure |
| // YarnTezDAGChild knows how to handle the additional types in terms of |
| // classpath modification |
| private static boolean localResourcesCompatible(Map<String, LocalResource> srcLRs, |
| Map<String, LocalResource> reqLRs) { |
| for (Entry<String, LocalResource> reqLREntry : reqLRs.entrySet()) { |
| LocalResource requestedLocalResource = srcLRs.get(reqLREntry.getKey()); |
| if (requestedLocalResource == null) { |
| LocalResource lr = reqLREntry.getValue(); |
| if (!LocalResourceType.FILE.equals(lr.getType())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Cannot match container: Additional local resource needed is not of type FILE" |
| + ", resourceName: " + reqLREntry.getKey() |
| + ", resourceDetails: " + reqLREntry); |
| } |
| return false; |
| } |
| } else if(!reqLREntry.getValue().equals(requestedLocalResource)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Cannot match container: Attempting to use same target resource name: " |
| + reqLREntry.getKey() |
| + ", but with different source resources. Already localized: " |
| + requestedLocalResource + ", requested: " + reqLREntry.getValue()); |
| } |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private static <K, V> boolean isSuperSet(Map<K, V> srcMap, Map<K, V> matchMap, |
| String matchInfo) { |
| for (Entry<K, V> oEntry : matchMap.entrySet()) { |
| K oKey = oEntry.getKey(); |
| V oVal = oEntry.getValue(); |
| V srcVal = srcMap.get(oKey); |
| if (!oVal.equals(srcVal)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Incompatible container context" |
| + ", matchInfo=" + matchInfo |
| + ", thisKey=" + oKey |
| + ", thisVal=" + srcVal |
| + ", otherVal=" + oVal); |
| } |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Create a new ContainerContext to account for container re-use. On re-use, there is |
| * re-localization of additional LocalResources. Also, a task from a different vertex could be |
| * run on the given container. |
| * |
| * Only a merge of local resources is needed as: |
| * |
| * credentials are modified at run-time based on the task spec. |
| * the environment for a container cannot be changed. A re-used container is always |
| * expected to have a super-set. |
| * javaOpts have to be identical for re-use. |
| * |
| * Vertex should be overridden to account for the new task being scheduled to run on this |
| * container context. |
| * |
| * @param c1 ContainerContext 1 Original task's context |
| * @param c2 ContainerContext 2 Newly assigned task's context |
| * @return Merged ContainerContext |
| */ |
| public static ContainerContext union(ContainerContext c1, ContainerContext c2) { |
| HashMap<String, LocalResource> mergedLR = new HashMap<String, LocalResource>(); |
| mergedLR.putAll(c1.getLocalResources()); |
| mergedLR.putAll(c2.getLocalResources()); |
| ContainerContext union = new ContainerContext(mergedLR, c1.credentials, c1.environment, |
| c1.javaOpts, c2.vertex); |
| return union; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append("LocalResources: ["); |
| if (localResources != null) { |
| for (Map.Entry<String, LocalResource> lr : localResources.entrySet()) { |
| sb.append("[ name=") |
| .append(lr.getKey()) |
| .append(", value=") |
| .append(lr.getValue()) |
| .append("],"); |
| } |
| } |
| sb.append("], environment: ["); |
| if (environment != null) { |
| for (Map.Entry<String, String> entry : environment.entrySet()) { |
| sb.append("[ ").append(entry.getKey()).append("=").append(entry.getValue()) |
| .append(" ],"); |
| } |
| } |
| sb.append("], credentials(token kinds): ["); |
| if (credentials != null) { |
| for (Token<? extends TokenIdentifier> t : credentials.getAllTokens()) { |
| sb.append(t.getKind().toString()) |
| .append(","); |
| } |
| } |
| sb.append("], javaOpts: ") |
| .append(javaOpts) |
| .append(", vertex: ") |
| .append(( vertex == null ? "null" : vertex.getLogIdentifier())); |
| |
| return sb.toString(); |
| } |
| |
| } |