| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.core.client.summary; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.UnaryOperator; |
| import java.util.stream.Collectors; |
| |
| import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.commons.lang.mutable.MutableLong; |
| |
| /** |
| * This class counts arbitrary keys while defending against too many keys and keys that are too |
| * long. |
| * |
| * <p> |
| * During collection and summarization this class will use the functions from {@link #converter()} |
| * and {@link #encoder()}. For each key/value the function from {@link #converter()} will be called |
| * to create zero or more counter objects. A counter associated with each counter object will be |
| * incremented, as long as there are not too many counters and the counter object is not too long. |
| * |
| * <p> |
| * When {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)} is called, the function |
| * from {@link #encoder()} will be used to convert counter objects to strings. These strings will be |
| * used to emit statistics. Overriding {@link #encoder()} is optional. One reason to override is if |
| * the counter object contains binary or special data. For example, a function that base64 encodes |
| * counter objects could be created. |
| * |
| * <p> |
| * If the counter key type is mutable, then consider overriding {@link #copier()}. |
| * |
| * <p> |
| * The function returned by {@link #converter()} will be called frequently and should be very |
| * efficient. The function returned by {@link #encoder()} will be called less frequently and can be |
| * more expensive. The reason these two functions exists is to avoid the conversion to string for |
| * each key value, if that conversion is unnecessary. |
| * |
| * <p> |
| * Below is an example implementation that counts column visibilities. This example avoids |
| * converting column visibility to string for each key/value. This example shows the source code for |
| * {@link VisibilitySummarizer}. |
| * |
| * <pre> |
| * <code> |
| * public class VisibilitySummarizer extends CountingSummarizer<ByteSequence> { |
| * @Override |
| * protected UnaryOperator<ByteSequence> copier() { |
| * // ByteSequences are mutable, so override and provide a copy function |
| * return ArrayByteSequence::new; |
| * } |
| * |
| * @Override |
| * protected Converter<ByteSequence> converter() { |
| * return (key, val, consumer) -> consumer.accept(key.getColumnVisibilityData()); |
| * } |
| * } |
| * </code> |
| * </pre> |
| * |
| * @param <K> |
| * The counter key type. This type must have good implementations of |
| * {@link Object#hashCode()} and {@link Object#equals(Object)}. |
| * @see CounterSummary |
| * @since 2.0.0 |
| */ |
| public abstract class CountingSummarizer<K> implements Summarizer { |
| |
| /** |
| * A configuration option for specifying the maximum number of unique counters an instance of this |
| * summarizer should track. If not specified, a default of {@value #MAX_COUNTER_DEFAULT} will be |
| * used. |
| */ |
| public static final String MAX_COUNTERS_OPT = "maxCounters"; |
| |
| /** |
| * A configuration option for specifying the maximum length of an individual counter key. If not |
| * specified, a default of {@value #MAX_CKL_DEFAULT} will be used. |
| */ |
| public static final String MAX_COUNTER_LEN_OPT = "maxCounterLen"; |
| |
| /** |
| * A configuration option to determine if delete keys should be counted. If set to true then |
| * delete keys will not be passed to the {@link Converter} and the statistic |
| * {@value #DELETES_IGNORED_STAT} will track the number of deleted ignored. This options defaults |
| * to {@value #INGNORE_DELETES_DEFAULT}. |
| */ |
| public static final String INGNORE_DELETES_OPT = "ignoreDeletes"; |
| |
| /** |
| * This prefixes all counters when emitting statistics in |
| * {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)}. |
| */ |
| public static final String COUNTER_STAT_PREFIX = "c:"; |
| |
| /** |
| * This is the name of the statistic that tracks how many counters objects were ignored because |
| * the number of unique counters was exceeded. The max number of unique counters is specified by |
| * {@link #MAX_COUNTERS_OPT}. |
| */ |
| public static final String TOO_MANY_STAT = "tooMany"; |
| |
| /** |
| * This is the name of the statistic that tracks how many counter objects were ignored because |
| * they were too long. The maximum length is specified by {@link #MAX_COUNTER_LEN_OPT}. |
| */ |
| public static final String TOO_LONG_STAT = "tooLong"; |
| |
| /** |
| * This is the name of the statistic that tracks the total number of counter objects emitted by |
| * the {@link Converter}. This includes emitted Counter objects that were ignored. |
| */ |
| public static final String EMITTED_STAT = "emitted"; |
| |
| /** |
| * This is the name of the statistic that tracks the total number of deleted keys seen. This |
| * statistic is only incremented when the {@value #INGNORE_DELETES_OPT} option is set to true. |
| */ |
| public static final String DELETES_IGNORED_STAT = "deletesIgnored"; |
| |
| /** |
| * This tracks the total number of key/values seen by the {@link Summarizer.Collector} |
| */ |
| public static final String SEEN_STAT = "seen"; |
| |
| // this default can not be changed as persisted summary data depends on it. See the documentation |
| // about persistence in the Summarizer class javadoc. |
| public static final String MAX_COUNTER_DEFAULT = "1024"; |
| |
| // this default can not be changed as persisted summary data depends on it |
| public static final String MAX_CKL_DEFAULT = "128"; |
| |
| // this default can not be changed as persisted summary data depends on it |
| public static final String INGNORE_DELETES_DEFAULT = "true"; |
| |
| private static final String[] ALL_STATS = {TOO_LONG_STAT, TOO_MANY_STAT, EMITTED_STAT, SEEN_STAT, |
| DELETES_IGNORED_STAT}; |
| |
| private int maxCounters; |
| private int maxCounterKeyLen; |
| private boolean ignoreDeletes; |
| |
| private void init(SummarizerConfiguration conf) { |
| maxCounters = Integer |
| .parseInt(conf.getOptions().getOrDefault(MAX_COUNTERS_OPT, MAX_COUNTER_DEFAULT)); |
| maxCounterKeyLen = Integer |
| .parseInt(conf.getOptions().getOrDefault(MAX_COUNTER_LEN_OPT, MAX_CKL_DEFAULT)); |
| ignoreDeletes = Boolean |
| .parseBoolean(conf.getOptions().getOrDefault(INGNORE_DELETES_OPT, INGNORE_DELETES_DEFAULT)); |
| } |
| |
| /** |
| * A function that converts key values to zero or more counter objects. |
| * |
| * @since 2.0.0 |
| */ |
| public interface Converter<K> { |
| /** |
| * @param consumer |
| * emit counter objects derived from key and value to this consumer |
| */ |
| void convert(Key k, Value v, Consumer<K> consumer); |
| } |
| |
| /** |
| * |
| * @return A function that is used to convert each key value to zero or more counter objects. Each |
| * function returned should be independent. |
| */ |
| protected abstract Converter<K> converter(); |
| |
| /** |
| * @return A function that is used to convert counter objects to String. The default function |
| * calls {@link Object#toString()} on the counter object. |
| */ |
| protected Function<K,String> encoder() { |
| return Object::toString; |
| } |
| |
| /** |
| * Override this if your key type is mutable and subject to change. |
| * |
| * @return a function that used to copy the counter object. This function is only used when the |
| * collector has never seen the counter object before. In this case the collector needs to |
| * possibly copy the counter object before using as map key. The default implementation is |
| * the {@link UnaryOperator#identity()} function. |
| */ |
| protected UnaryOperator<K> copier() { |
| return UnaryOperator.identity(); |
| } |
| |
| @Override |
| public Collector collector(SummarizerConfiguration sc) { |
| init(sc); |
| return new Collector() { |
| |
| // Map used for computing summary incrementally uses ByteSequence for key which is more |
| // efficient than converting String for each Key. The |
| // conversion to String is deferred until the summary is requested. |
| |
| private Map<K,MutableLong> counters = new HashMap<>(); |
| private long tooMany = 0; |
| private long tooLong = 0; |
| private long seen = 0; |
| private long emitted = 0; |
| private long deleted = 0; |
| private Converter<K> converter = converter(); |
| private Function<K,String> encoder = encoder(); |
| private UnaryOperator<K> copier = copier(); |
| |
| private void incrementCounter(K counter) { |
| emitted++; |
| |
| MutableLong ml = counters.get(counter); |
| if (ml == null) { |
| if (counters.size() >= maxCounters) { |
| // no need to store this counter in the map and get() it... just use instance variable |
| tooMany++; |
| } else { |
| // we have never seen this key before, check if its too long |
| if (encoder.apply(counter).length() >= maxCounterKeyLen) { |
| tooLong++; |
| } else { |
| counters.put(copier.apply(counter), new MutableLong(1)); |
| } |
| } |
| } else { |
| // using mutable long allows calling put() to be avoided |
| ml.increment(); |
| } |
| } |
| |
| @Override |
| public void accept(Key k, Value v) { |
| seen++; |
| if (ignoreDeletes && k.isDeleted()) { |
| deleted++; |
| } else { |
| converter.convert(k, v, this::incrementCounter); |
| } |
| } |
| |
| @Override |
| public void summarize(StatisticConsumer sc) { |
| StringBuilder sb = new StringBuilder(COUNTER_STAT_PREFIX); |
| |
| for (Entry<K,MutableLong> entry : counters.entrySet()) { |
| sb.setLength(COUNTER_STAT_PREFIX.length()); |
| sb.append(encoder.apply(entry.getKey())); |
| sc.accept(sb.toString(), entry.getValue().longValue()); |
| } |
| |
| sc.accept(TOO_MANY_STAT, tooMany); |
| sc.accept(TOO_LONG_STAT, tooLong); |
| sc.accept(EMITTED_STAT, emitted); |
| sc.accept(SEEN_STAT, seen); |
| sc.accept(DELETES_IGNORED_STAT, deleted); |
| } |
| }; |
| } |
| |
| @Override |
| public Combiner combiner(SummarizerConfiguration sc) { |
| init(sc); |
| return (summary1, summary2) -> { |
| |
| for (String key : ALL_STATS) { |
| summary1.merge(key, summary2.getOrDefault(key, 0L), Long::sum); |
| } |
| |
| for (Entry<String,Long> entry : summary2.entrySet()) { |
| String k2 = entry.getKey(); |
| Long v2 = entry.getValue(); |
| |
| if (k2.startsWith(COUNTER_STAT_PREFIX)) { |
| summary1.merge(k2, v2, Long::sum); |
| } |
| } |
| |
| if (summary1.size() - ALL_STATS.length > maxCounters) { |
| // find the keys with the lowest counts to remove |
| List<String> keysToRemove = summary1.entrySet().stream() |
| .filter(e -> e.getKey().startsWith(COUNTER_STAT_PREFIX)) // filter out non counters |
| .sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue())) // sort descending by |
| // count |
| .skip(maxCounters) // skip most frequent |
| .map(Entry::getKey).collect(Collectors.toList()); // collect the least frequent |
| // counters in a list |
| |
| long removedCount = 0; |
| for (String key : keysToRemove) { |
| removedCount += summary1.remove(key); |
| } |
| |
| summary1.merge(TOO_MANY_STAT, removedCount, Long::sum); |
| } |
| }; |
| } |
| } |