| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; |
| |
| import java.util.Collection; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; |
| import org.apache.hadoop.yarn.server.api.ContainerLogContext; |
| import org.apache.hadoop.yarn.server.api.ContainerType; |
| |
| /** |
| * The sample policy samples logs of successful worker containers to aggregate. |
| * It always aggregates AM container and failed/killed worker |
| * containers' logs. To make sure small applications have enough logs, it only |
| * applies sampling beyond minimal number of containers. The parameters can be |
| * configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is |
| * 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful |
| * worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be |
| * aggregated. |
| */ |
| @Private |
| public class SampleContainerLogAggregationPolicy implements |
| ContainerLogAggregationPolicy { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(SampleContainerLogAggregationPolicy.class); |
| |
| static String SAMPLE_RATE = "SR"; |
| public static final float DEFAULT_SAMPLE_RATE = 0.2f; |
| |
| static String MIN_THRESHOLD = "MIN"; |
| public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20; |
| |
| private float sampleRate = DEFAULT_SAMPLE_RATE; |
| private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD; |
| |
| static public String buildParameters(float sampleRate, int minThreshold) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(","). |
| append(MIN_THRESHOLD).append(":").append(minThreshold); |
| return sb.toString(); |
| } |
| |
| // Parameters are comma separated properties, for example |
| // "SR:0.5,MIN:50" |
| public void parseParameters(String parameters) { |
| Collection<String> params = StringUtils.getStringCollection(parameters); |
| for(String param : params) { |
| // The first element is the property name. |
| // The second element is the property value. |
| String[] property = StringUtils.getStrings(param, ":"); |
| if (property == null || property.length != 2) { |
| continue; |
| } |
| if (property[0].equals(SAMPLE_RATE)) { |
| try { |
| float sampleRate = Float.parseFloat(property[1]); |
| if (sampleRate >= 0.0 && sampleRate <= 1.0) { |
| this.sampleRate = sampleRate; |
| } else { |
| LOG.warn("The format isn't valid. Sample rate falls back to the " + |
| "default value " + DEFAULT_SAMPLE_RATE); |
| } |
| } catch (NumberFormatException nfe) { |
| LOG.warn("The format isn't valid. Sample rate falls back to the " + |
| "default value " + DEFAULT_SAMPLE_RATE); |
| } |
| } else if (property[0].equals(MIN_THRESHOLD)) { |
| try { |
| int minThreshold = Integer.parseInt(property[1]); |
| if (minThreshold >= 0) { |
| this.minThreshold = minThreshold; |
| } else { |
| LOG.warn("The format isn't valid. Min threshold falls back to " + |
| "the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD); |
| } |
| } catch (NumberFormatException nfe) { |
| LOG.warn("The format isn't valid. Min threshold falls back to the " + |
| "default value " + DEFAULT_SAMPLE_MIN_THRESHOLD); |
| } |
| } |
| } |
| } |
| |
| public boolean shouldDoLogAggregation(ContainerLogContext logContext) { |
| if (logContext.getContainerType() == |
| ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) { |
| // If it is AM or failed or killed container, enable log aggregation. |
| return true; |
| } |
| |
| // Only sample log aggregation for large applications. |
| // We assume the container id is continuously allocated from number 1 and |
| // Worker containers start from id 2. So logs of worker containers with ids |
| // in [2, minThreshold + 1] will be aggregated. |
| if ((logContext.getContainerId().getContainerId() & |
| ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) { |
| return true; |
| } |
| |
| // Sample log aggregation for the rest of successful worker containers |
| return (sampleRate != 0 && |
| logContext.getContainerId().hashCode() % (1/sampleRate) == 0); |
| } |
| } |