blob: 93c00ed4ab29124bfcbb850835580096f457afeb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.util;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.action.hadoop.PasswordMasker;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All
* instrumentation elements have a group and a name.
* @deprecated since 5.0.0
*/
@Deprecated
public class Instrumentation {
private ScheduledExecutorService scheduler;
private Lock counterLock;
private Lock timerLock;
private Lock variableLock;
private Lock samplerLock;
private Map<String, Map<String, Map<String, Object>>> all;
private Map<String, Map<String, Element<Long>>> counters;
private Map<String, Map<String, Element<Timer>>> timers;
private Map<String, Map<String, Element<Variable>>> variables;
private Map<String, Map<String, Element<Double>>> samplers;
/**
* Instrumentation constructor.
*/
@SuppressWarnings("unchecked")
public Instrumentation() {
counterLock = new ReentrantLock();
timerLock = new ReentrantLock();
variableLock = new ReentrantLock();
samplerLock = new ReentrantLock();
all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
}
/**
* Set the scheduler instance to handle the samplers.
*
* @param scheduler scheduler instance.
*/
public void setScheduler(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
/**
* Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not
* need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time
* minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a
* Instrumentation instance.
*/
public static class Cron {
private long start;
private long end;
private long lapStart;
private long own;
private long total;
private boolean running;
/**
* Creates new Cron, stopped, in zero.
*/
public Cron() {
running = false;
}
/**
* Start the cron. It cannot be already started.
*/
public void start() {
if (!running) {
if (lapStart == 0) {
lapStart = System.currentTimeMillis();
if (start == 0) {
start = lapStart;
end = start;
}
}
running = true;
}
}
/**
* Stops the cron. It cannot be already stopped.
*/
public void stop() {
if (running) {
end = System.currentTimeMillis();
if (start == 0) {
start = end;
}
total = end - start;
if (lapStart > 0) {
own += end - lapStart;
lapStart = 0;
}
running = false;
}
}
/**
* Return the start time of the cron. It must be stopped.
*
* @return the start time of the cron.
*/
public long getStart() {
if (running) {
throw new IllegalStateException("Timer running");
}
return start;
}
/**
* Return the end time of the cron. It must be stopped.
*
* @return the end time of the cron.
*/
public long getEnd() {
if (running) {
throw new IllegalStateException("Timer running");
}
return end;
}
/**
* Return the total time of the cron. It must be stopped.
*
* @return the total time of the cron.
*/
public long getTotal() {
if (running) {
throw new IllegalStateException("Timer running");
}
return total;
}
/**
* Return the own time of the cron. It must be stopped.
*
* @return the own time of the cron.
*/
public long getOwn() {
if (running) {
throw new IllegalStateException("Timer running");
}
return own;
}
}
/**
* Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots
* are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
*/
public interface Element<T> {
/**
* Return the snapshot value of the Intrumentation element.
*
* @return the snapshot value of the Intrumentation element.
*/
T getValue();
}
/**
* Counter Instrumentation element.
*/
public static class Counter extends AtomicLong implements Element<Long> {
/**
* Return the counter snapshot.
*
* @return the counter snapshot.
*/
public Long getValue() {
return get();
}
/**
* Return the String representation of the counter value.
*
* @return the String representation of the counter value.
*/
public String toString() {
return Long.toString(get());
}
}
/**
* Timer Instrumentation element.
*/
public static class Timer implements Element<Timer> {
Lock lock = new ReentrantLock();
private long ownTime;
private long totalTime;
private long ticks;
private long ownSquareTime;
private long totalSquareTime;
private long ownMinTime;
private long ownMaxTime;
private long totalMinTime;
private long totalMaxTime;
/**
* Timer constructor. <p> It is project private for test purposes.
*/
Timer() {
}
/**
* Return the String representation of the timer value.
*
* @return the String representation of the timer value.
*/
public String toString() {
return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
}
/**
* Return the timer snapshot.
*
* @return the timer snapshot.
*/
public Timer getValue() {
try {
lock.lock();
Timer timer = new Timer();
timer.ownTime = ownTime;
timer.totalTime = totalTime;
timer.ticks = ticks;
timer.ownSquareTime = ownSquareTime;
timer.totalSquareTime = totalSquareTime;
timer.ownMinTime = ownMinTime;
timer.ownMaxTime = ownMaxTime;
timer.totalMinTime = totalMinTime;
timer.totalMaxTime = totalMaxTime;
return timer;
}
finally {
lock.unlock();
}
}
/**
* Add a cron to a timer. <p> It is project private for test purposes.
*
* @param cron Cron to add.
*/
void addCron(Cron cron) {
try {
lock.lock();
long own = cron.getOwn();
long total = cron.getTotal();
ownTime += own;
totalTime += total;
ticks++;
ownSquareTime += own * own;
totalSquareTime += total * total;
if (ticks == 1) {
ownMinTime = own;
ownMaxTime = own;
totalMinTime = total;
totalMaxTime = total;
}
else {
ownMinTime = Math.min(ownMinTime, own);
ownMaxTime = Math.max(ownMaxTime, own);
totalMinTime = Math.min(totalMinTime, total);
totalMaxTime = Math.max(totalMaxTime, total);
}
}
finally {
lock.unlock();
}
}
/**
* Return the own accumulated computing time by the timer.
*
* @return own accumulated computing time by the timer.
*/
public long getOwn() {
return ownTime;
}
/**
* Return the total accumulated computing time by the timer.
*
* @return total accumulated computing time by the timer.
*/
public long getTotal() {
return totalTime;
}
/**
* Return the number of times a cron was added to the timer.
*
* @return the number of times a cron was added to the timer.
*/
public long getTicks() {
return ticks;
}
/**
* Return the sum of the square own times. <p> It can be used to calculate the standard deviation.
*
* @return the sum of the square own timer.
*/
public long getOwnSquareSum() {
return ownSquareTime;
}
/**
* Return the sum of the square total times. <p> It can be used to calculate the standard deviation.
*
* @return the sum of the square own timer.
*/
public long getTotalSquareSum() {
return totalSquareTime;
}
/**
* Returns the own minimum time.
*
* @return the own minimum time.
*/
public long getOwnMin() {
return ownMinTime;
}
/**
* Returns the own maximum time.
*
* @return the own maximum time.
*/
public long getOwnMax() {
return ownMaxTime;
}
/**
* Returns the total minimum time.
*
* @return the total minimum time.
*/
public long getTotalMin() {
return totalMinTime;
}
/**
* Returns the total maximum time.
*
* @return the total maximum time.
*/
public long getTotalMax() {
return totalMaxTime;
}
/**
* Returns the own average time.
*
* @return the own average time.
*/
public long getOwnAvg() {
return (ticks != 0) ? ownTime / ticks : 0;
}
/**
* Returns the total average time.
*
* @return the total average time.
*/
public long getTotalAvg() {
return (ticks != 0) ? totalTime / ticks : 0;
}
/**
* Returns the total time standard deviation.
*
* @return the total time standard deviation.
*/
public double getTotalStdDev() {
return evalStdDev(ticks, totalTime, totalSquareTime);
}
/**
* Returns the own time standard deviation.
*
* @return the own time standard deviation.
*/
public double getOwnStdDev() {
return evalStdDev(ticks, ownTime, ownSquareTime);
}
private double evalStdDev(long n, long sn, long ssn) {
return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
}
}
/**
* Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread
* safe.
*
* @param group timer group.
* @param name timer name.
* @param cron cron to add to the timer.
*/
public void addCron(String group, String name, Cron cron) {
Map<String, Element<Timer>> map = timers.get(group);
if (map == null) {
try {
timerLock.lock();
map = timers.get(group);
if (map == null) {
map = new HashMap<String, Element<Timer>>();
timers.put(group, map);
}
}
finally {
timerLock.unlock();
}
}
Timer timer = (Timer) map.get(name);
if (timer == null) {
try {
timerLock.lock();
timer = (Timer) map.get(name);
if (timer == null) {
timer = new Timer();
map.put(name, timer);
}
}
finally {
timerLock.unlock();
}
}
timer.addCron(cron);
}
/**
* Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread
* safe.
*
* @param group counter group.
* @param name counter name.
* @param count increment to add to the counter.
*/
public void incr(String group, String name, long count) {
Map<String, Element<Long>> map = counters.get(group);
if (map == null) {
try {
counterLock.lock();
map = counters.get(group);
if (map == null) {
map = new HashMap<String, Element<Long>>();
counters.put(group, map);
}
}
finally {
counterLock.unlock();
}
}
Counter counter = (Counter) map.get(name);
if (counter == null) {
try {
counterLock.lock();
counter = (Counter) map.get(name);
if (counter == null) {
counter = new Counter();
map.put(name, counter);
}
}
finally {
counterLock.unlock();
}
}
counter.addAndGet(count);
}
/**
* Decrement an instrumentation counter. The counter is created if it does not exists. <p> This method is thread
* safe.
*
* @param group counter group.
* @param name counter name.
* @param count decrement to add to the counter.
*/
public void decr(final String group, final String name, final long count) {
incr(group, name, -count);
}
/**
* Interface for instrumentation variables. <p> For example a the database service could expose the number of
* currently active connections.
*/
public interface Variable<T> extends Element<T> {
}
/**
* Add an instrumentation variable. The variable must not exist. <p> This method is thread safe.
*
* @param group counter group.
* @param name counter name.
* @param variable variable to add.
*/
@SuppressWarnings("unchecked")
public void addVariable(String group, String name, Variable variable) {
Map<String, Element<Variable>> map = variables.get(group);
if (map == null) {
try {
variableLock.lock();
map = variables.get(group);
if (map == null) {
map = new HashMap<String, Element<Variable>>();
variables.put(group, map);
}
}
finally {
variableLock.unlock();
}
}
if (map.containsKey(name)) {
throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
}
map.put(name, variable);
}
/**
* Return the JVM system properties.
*
* @return JVM system properties.
*/
public Map<String, String> getJavaSystemProperties() {
Map<String, String> unmasked = Maps.fromProperties(System.getProperties());
return new PasswordMasker().mask(unmasked);
}
/**
* Return the OS environment used to start Oozie.
*
* @return the OS environment used to start Oozie.
*/
public Map<String, String> getOSEnv() {
Map<String, String> unmasked = System.getenv();
return new PasswordMasker().mask(unmasked);
}
/**
* Return the current system configuration as a Map&lt;String,String&gt;.
*
* @return the current system configuration as a Map&lt;String,String&gt;.
*/
public Map<String, String> getConfiguration() {
final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration();
return new Map<String, String>() {
public int size() {
return maskedConf.size();
}
public boolean isEmpty() {
return maskedConf.size() == 0;
}
public boolean containsKey(Object o) {
return maskedConf.get((String) o) != null;
}
public boolean containsValue(Object o) {
throw new UnsupportedOperationException();
}
public String get(Object o) {
return maskedConf.get((String) o);
}
public String put(String s, String s1) {
throw new UnsupportedOperationException();
}
public String remove(Object o) {
throw new UnsupportedOperationException();
}
public void putAll(Map<? extends String, ? extends String> map) {
throw new UnsupportedOperationException();
}
public void clear() {
throw new UnsupportedOperationException();
}
public Set<String> keySet() {
Set<String> set = new LinkedHashSet<String>();
for (Entry<String, String> entry : maskedConf) {
set.add(entry.getKey());
}
return set;
}
public Collection<String> values() {
Set<String> set = new LinkedHashSet<String>();
for (Entry<String, String> entry : maskedConf) {
set.add(entry.getValue());
}
return set;
}
public Set<Entry<String, String>> entrySet() {
Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
for (Entry<String, String> entry : maskedConf) {
set.add(entry);
}
return set;
}
};
}
/**
* Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a
* snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
*
* @return all counters.
*/
public Map<String, Map<String, Element<Long>>> getCounters() {
return counters;
}
/**
* Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all
* its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
* invoked.
*
* @return all counters.
*/
public Map<String, Map<String, Element<Timer>>> getTimers() {
return timers;
}
/**
* Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a
* snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
*
* @return all counters.
*/
public Map<String, Map<String, Element<Variable>>> getVariables() {
return variables;
}
/**
* Return a map containing all variables, counters and timers.
*
* @return a map containing all variables, counters and timers.
*/
public Map<String, Map<String, Map<String, Object>>> getAll() {
return all;
}
/**
* Return the string representation of the instrumentation.
*
* @return the string representation of the instrumentation.
*/
public String toString() {
String E = System.getProperty("line.separator");
StringBuilder sb = new StringBuilder(4096);
for (Map.Entry<String, Map<String, Map<String, Object>>> entry : all.entrySet()) {
String allKey = entry.getKey();
Map<String, Map<String, Object>> allValue = entry.getValue();
sb.append(allKey).append(':').append(E);
List<String> groups = new ArrayList<>(allValue.keySet());
Collections.sort(groups);
for (String group : groups) {
sb.append(" ").append(group).append(':').append(E);
List<String> names = new ArrayList<>(allValue.get(group).keySet());
Collections.sort(names);
for (String name : names) {
sb.append(" ").append(name).append(": ").append(((Element)allValue.
get(group).get(name)).getValue()).append(E);
}
}
}
return sb.toString();
}
private static class Sampler implements Element<Double>, Runnable {
private Lock lock = new ReentrantLock();
private int samplingInterval;
private Variable<Long> variable;
private long[] values;
private int current;
private long valuesSum;
private double rate;
public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
this.samplingInterval = samplingInterval;
this.variable = variable;
values = new long[samplingPeriod / samplingInterval];
valuesSum = 0;
current = -1;
}
public int getSamplingInterval() {
return samplingInterval;
}
public void run() {
try {
lock.lock();
long newValue = variable.getValue();
if (current == -1) {
valuesSum = newValue;
current = 0;
values[current] = newValue;
}
else {
current = (current + 1) % values.length;
valuesSum = valuesSum - values[current] + newValue;
values[current] = newValue;
}
rate = ((double) valuesSum) / values.length;
}
finally {
lock.unlock();
}
}
public Double getValue() {
return rate;
}
}
/**
* Add a sampling variable. <p> This method is thread safe.
*
* @param group timer group.
* @param name timer name.
* @param period sampling period to compute rate.
* @param interval sampling frequency, how often the variable is probed.
* @param variable variable to sample.
*/
public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
if (scheduler == null) {
throw new IllegalStateException("scheduler not set, cannot sample");
}
try {
samplerLock.lock();
Map<String, Element<Double>> map = samplers.get(group);
if (map == null) {
map = samplers.get(group);
if (map == null) {
map = new HashMap<String, Element<Double>>();
samplers.put(group, map);
}
}
if (map.containsKey(name)) {
throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
}
else {
Sampler sampler = new Sampler(period, interval, variable);
map.put(name, sampler);
scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
}
}
finally {
samplerLock.unlock();
}
}
/**
* Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a
* snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
*
* @return all counters.
*/
public Map<String, Map<String, Element<Double>>> getSamplers() {
return samplers;
}
public void stop() {
}
}