blob: 03f548c7a249e8b766fe26070ff614405c757ceb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
/**
* The sample policy samples logs of successful worker containers to aggregate.
* It always aggregates AM container and failed/killed worker
* containers' logs. To make sure small applications have enough logs, it only
* applies sampling beyond minimal number of containers. The parameters can be
* configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is
* 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful
* worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be
* aggregated.
*/
@Private
public class SampleContainerLogAggregationPolicy implements
ContainerLogAggregationPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(SampleContainerLogAggregationPolicy.class);
static String SAMPLE_RATE = "SR";
public static final float DEFAULT_SAMPLE_RATE = 0.2f;
static String MIN_THRESHOLD = "MIN";
public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20;
private float sampleRate = DEFAULT_SAMPLE_RATE;
private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD;
static public String buildParameters(float sampleRate, int minThreshold) {
StringBuilder sb = new StringBuilder();
sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(",").
append(MIN_THRESHOLD).append(":").append(minThreshold);
return sb.toString();
}
// Parameters are comma separated properties, for example
// "SR:0.5,MIN:50"
public void parseParameters(String parameters) {
Collection<String> params = StringUtils.getStringCollection(parameters);
for(String param : params) {
// The first element is the property name.
// The second element is the property value.
String[] property = StringUtils.getStrings(param, ":");
if (property == null || property.length != 2) {
continue;
}
if (property[0].equals(SAMPLE_RATE)) {
try {
float sampleRate = Float.parseFloat(property[1]);
if (sampleRate >= 0.0 && sampleRate <= 1.0) {
this.sampleRate = sampleRate;
} else {
LOG.warn("The format isn't valid. Sample rate falls back to the " +
"default value " + DEFAULT_SAMPLE_RATE);
}
} catch (NumberFormatException nfe) {
LOG.warn("The format isn't valid. Sample rate falls back to the " +
"default value " + DEFAULT_SAMPLE_RATE);
}
} else if (property[0].equals(MIN_THRESHOLD)) {
try {
int minThreshold = Integer.parseInt(property[1]);
if (minThreshold >= 0) {
this.minThreshold = minThreshold;
} else {
LOG.warn("The format isn't valid. Min threshold falls back to " +
"the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
}
} catch (NumberFormatException nfe) {
LOG.warn("The format isn't valid. Min threshold falls back to the " +
"default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
}
}
}
}
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
if (logContext.getContainerType() ==
ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) {
// If it is AM or failed or killed container, enable log aggregation.
return true;
}
// Only sample log aggregation for large applications.
// We assume the container id is continuously allocated from number 1 and
// Worker containers start from id 2. So logs of worker containers with ids
// in [2, minThreshold + 1] will be aggregated.
if ((logContext.getContainerId().getContainerId() &
ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) {
return true;
}
// Sample log aggregation for the rest of successful worker containers
return (sampleRate != 0 &&
logContext.getContainerId().hashCode() % (1/sampleRate) == 0);
}
}