blob: 9ade12d578c473ef2d4e0385195706528e8c49af [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* LossyCounting utility, bounded data structure that maintains approximate high frequency
* elements in data stream.
*
* Bucket size is 1 / error rate. (Error rate is 0.02 by default)
* Lemma If element does not appear in set, then is frequency is less than e * N
* (N is total element counts until now.)
* Based on paper:
* http://www.vldb.org/conf/2002/S10P03.pdf
*/
@InterfaceAudience.Private
public class LossyCounting<T> {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private final ExecutorService executor;
private long bucketSize;
private int currentTerm;
private Map<T, Integer> data;
private long totalDataCount;
private final String name;
private LossyCountingListener<T> listener;
private static AtomicReference<Future<?>> fut = new AtomicReference<>(null);
public interface LossyCountingListener<T> {
void sweep(T key);
}
LossyCounting(String name, double errorRate) {
this(name, errorRate, null);
}
public LossyCounting(String name, double errorRate, LossyCountingListener<T> listener) {
this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
}
this.bucketSize = (long) Math.ceil(1 / errorRate);
this.currentTerm = 1;
this.totalDataCount = 0;
this.data = new ConcurrentHashMap<>();
this.listener = listener;
calculateCurrentTerm();
executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lossy-count-%d").build());
}
LossyCounting(String name, Configuration conf) {
this(name, conf, null);
}
public LossyCounting(String name, Configuration conf, LossyCountingListener<T> listener) {
this(name, conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), listener);
}
private void addByOne(T key) {
//If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
//we create a new entry starting with currentTerm so that it will not be pruned immediately
data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
//update totalDataCount and term
totalDataCount++;
calculateCurrentTerm();
}
public void add(T key) {
addByOne(key);
if(totalDataCount % bucketSize == 0) {
//sweep the entries at bucket boundaries
//run Sweep
Future<?> future = fut.get();
if (future != null && !future.isDone()){
return;
}
future = executor.submit(new SweepRunnable());
fut.set(future);
}
}
/**
* sweep low frequency data
*/
public void sweep() {
for(Map.Entry<T, Integer> entry : data.entrySet()) {
if(entry.getValue() < currentTerm) {
T metric = entry.getKey();
data.remove(metric);
if (listener != null) {
listener.sweep(metric);
}
}
}
}
/**
* Calculate and set current term
*/
private void calculateCurrentTerm() {
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
}
public long getBucketSize(){
return bucketSize;
}
public long getDataSize() {
return data.size();
}
public boolean contains(T key) {
return data.containsKey(key);
}
public Set<T> getElements(){
return data.keySet();
}
public long getCurrentTerm() {
return currentTerm;
}
class SweepRunnable implements Runnable {
@Override public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace("Starting sweep of lossyCounting-" + name);
}
try {
sweep();
} catch (Exception exception) {
LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
}
}
}
public Future<?> getSweepFuture() {
return fut.get();
}
}