blob: a61eab93c02a9b3e15a5124aa2447ac06999cc3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.tools.pigstats.spark;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigWarning;
import org.apache.pig.tools.pigstats.PigWarnCounter;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class SparkCounters implements Serializable, PigWarnCounter {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(SparkCounters.class);
private Map<String, SparkCounterGroup> sparkCounterGroups;
private final transient JavaSparkContext javaSparkContext;
private SparkCounters() {
this(null);
}
public SparkCounters(JavaSparkContext javaSparkContext) {
this.javaSparkContext = javaSparkContext;
this.sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
}
public void createCounter(Enum<?> key) {
createCounter(key.getDeclaringClass().getName(), key.name());
}
public void createCounter(String groupName, Enum<?> key) {
createCounter(groupName, key.name(), 0L);
}
public void createCounter(String groupName, String counterName) {
createCounter(groupName, counterName, 0L);
}
public void createCounter(String groupName, String counterName, Object initValue) {
getGroup(groupName).createCounter(counterName, initValue);
}
public void increment(Enum<?> key, long incrValue) {
increment(key.getDeclaringClass().getName(), key.name(), incrValue);
}
public void increment(String groupName, String counterName, long value) {
SparkCounter counter = getGroup(groupName).getCounter(counterName);
if (counter == null) {
LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
} else {
counter.increment(value);
}
}
public Object getValue(String groupName, String counterName) {
SparkCounter counter = getGroup(groupName).getCounter(counterName);
if (counter == null) {
LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
return null;
} else {
return counter.getValue();
}
}
public SparkCounter getCounter(String groupName, String counterName) {
return getGroup(groupName).getCounter(counterName);
}
public SparkCounter getCounter(Enum<?> key) {
return getCounter(key.getDeclaringClass().getName(), key.name());
}
private SparkCounterGroup getGroup(String groupName) {
SparkCounterGroup group = sparkCounterGroups.get(groupName);
if (group == null) {
group = new SparkCounterGroup.LongSparkCounterGroup(groupName, groupName, javaSparkContext);
sparkCounterGroups.put(groupName, group);
}
return group;
}
public Map<String, SparkCounterGroup> getSparkCounterGroups() {
return sparkCounterGroups;
}
@Override
public boolean incrWarnCounter(Enum<?> name, Object incr) {
SparkCounter counter = getCounter(PigWarning.SPARK_WARN);
return _incrWarnCounter(counter, name.name(), (Long) incr);
}
@Override
public boolean incrWarnCounter(String group, String name, Object incr) {
SparkCounter counter = getCounter(PigWarning.SPARK_CUSTOM_WARN);
return _incrWarnCounter(counter, group+"::"+name, (Long) incr);
}
private static boolean _incrWarnCounter(SparkCounter counter, String name, Long incr) {
if (counter == null){
return false;
}
Map<String,Long> map = new HashMap<String,Long>();
map.put(name, incr);
counter.increment(map);
return true;
}
}