blob: 5a975da7af7fc3304449638379c312d10c69cdba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.zetasketch;
import com.google.zetasketch.HyperLogLogPlusPlus;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@code PTransform}s to compute HyperLogLogPlusPlus (HLL++) sketches on data streams based on the
* <a href="https://github.com/google/zetasketch">ZetaSketch</a> implementation.
*
* <p>HLL++ is an algorithm implemented by Google that estimates the count of distinct elements in a
* data stream. HLL++ requires significantly less memory than the linear memory needed for exact
* computation, at the cost of a small error. Cardinalities of arbitrary breakdowns can be computed
* using the HLL++ sketch. See this <a
* href="http://static.googleusercontent.com/media/research.google.com/en/us/pubs/archive/40671.pdf">published
* paper</a> for details about the algorithm.
*
* <p>HLL++ functions are also supported in <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/hll_functions">Google Cloud
* BigQuery</a>. The {@code HllCount PTransform}s provided here produce and consume sketches
* compatible with BigQuery.
*
* <p>For detailed design of this class, see https://s.apache.org/hll-in-beam.
*
* <h3>Examples</h3>
*
* <h4>Example 1: Create long-type sketch for a {@code PCollection<Long>} and specify precision</h4>
*
* <pre>{@code
* PCollection<Long> input = ...;
* int p = ...;
* PCollection<byte[]> sketch = input.apply(HllCount.Init.forLongs().withPrecision(p).globally());
* }</pre>
*
* <h4>Example 2: Create bytes-type sketch for a {@code PCollection<KV<String, byte[]>>}</h4>
*
* <pre>{@code
* PCollection<KV<String, byte[]>> input = ...;
* PCollection<KV<String, byte[]>> sketch = input.apply(HllCount.Init.forBytes().perKey());
* }</pre>
*
* <h4>Example 3: Merge existing sketches in a {@code PCollection<byte[]>} into a new one</h4>
*
* <pre>{@code
* PCollection<byte[]> sketches = ...;
* PCollection<byte[]> mergedSketch = sketches.apply(HllCount.MergePartial.globally());
* }</pre>
*
* <h4>Example 4: Estimates the count of distinct elements in a {@code PCollection<String>}</h4>
*
* <pre>{@code
* PCollection<String> input = ...;
* PCollection<Long> countDistinct =
* input.apply(HllCount.Init.forStrings().globally()).apply(HllCount.Extract.globally());
* }</pre>
*
* Note: Currently HllCount does not work on FnAPI workers. See <a
* href="https://issues.apache.org/jira/browse/BEAM-7879">Jira ticket [BEAM-7879]</a>.
*/
@Experimental
public final class HllCount {
private static final Logger LOG = LoggerFactory.getLogger(HllCount.class);
/**
* The minimum {@code precision} value you can set in {@link Init.Builder#withPrecision(int)} is
* {@value}.
*/
public static final int MINIMUM_PRECISION = HyperLogLogPlusPlus.MINIMUM_PRECISION;
/**
* The maximum {@code precision} value you can set in {@link Init.Builder#withPrecision(int)} is
* {@value}.
*/
public static final int MAXIMUM_PRECISION = HyperLogLogPlusPlus.MAXIMUM_PRECISION;
/**
* The default {@code precision} value used in {@link Init.Builder#withPrecision(int)} is
* {@value}.
*/
public static final int DEFAULT_PRECISION = HyperLogLogPlusPlus.DEFAULT_NORMAL_PRECISION;
// Cannot be instantiated. This class is intended to be a namespace only.
private HllCount() {}
/**
* Provides {@code PTransform}s to aggregate inputs into HLL++ sketches. The four supported input
* types are {@code Integer}, {@code Long}, {@code String}, and {@code byte[]}.
*
* <p>Sketches are represented using the {@code byte[]} type. Sketches of the same type can be
* merged into a new sketch using {@link HllCount.MergePartial}. Estimated count of distinct
* elements can be extracted from sketches using {@link HllCount.Extract}.
*
* <p>An "empty sketch" represented by an byte array of length 0 is returned if the input {@code
* PCollection} is empty.
*
* <p>Corresponds to the {@code HLL_COUNT.INIT(input [, precision])} function in <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/hll_functions">BigQuery</a>.
*/
public static final class Init {
// Cannot be instantiated. This class is intended to be a namespace only.
private Init() {}
/**
* Returns a {@link Builder} for a {@code HllCount.Init} combining {@code PTransform} that
* computes integer-type HLL++ sketches. Call {@link Builder#globally()} or {@link
* Builder#perKey()} on the returning {@link Builder} to finalize the {@code PTransform}.
*
* <p>Calling {@link Builder#globally()} returns a {@code PTransform} that takes an input {@code
* PCollection<Integer>} and returns a {@code PCollection<byte[]>} which consists of the
* integer-type HLL++ sketch computed from the elements in the input {@code PCollection}.
*
* <p>Calling {@link Builder#perKey()} returns a {@code PTransform} that takes an input {@code
* PCollection<KV<K, Integer>>} and returns a {@code PCollection<KV<K, byte[]>>} which consists
* of the per-key integer-type HLL++ sketch computed from the values matching each key in the
* input {@code PCollection}.
*
* <p>Integer-type sketches cannot be merged with sketches of other types.
*/
public static Builder<Integer> forIntegers() {
return new Builder<>(HllCountInitFn.forInteger());
}
/**
* Returns a {@link Builder} for a {@code HllCount.Init} combining {@code PTransform} that
* computes long-type HLL++ sketches. Call {@link Builder#globally()} or {@link
* Builder#perKey()} on the returning {@link Builder} to finalize the {@code PTransform}.
*
* <p>Calling {@link Builder#globally()} returns a {@code PTransform} that takes an input {@code
* PCollection<Long>} and returns a {@code PCollection<byte[]>} which consists of the long-type
* HLL++ sketch computed from the elements in the input {@code PCollection}.
*
* <p>Calling {@link Builder#perKey()} returns a {@code PTransform} that takes an input {@code
* PCollection<KV<K, Long>>} and returns a {@code PCollection<KV<K, byte[]>>} which consists of
* the per-key long-type HLL++ sketch computed from the values matching each key in the input
* {@code PCollection}.
*
* <p>Long-type sketches cannot be merged with sketches of other types.
*/
public static Builder<Long> forLongs() {
return new Builder<>(HllCountInitFn.forLong());
}
/**
* Returns a {@link Builder} for a {@code HllCount.Init} combining {@code PTransform} that
* computes string-type HLL++ sketches. Call {@link Builder#globally()} or {@link
* Builder#perKey()} on the returning {@link Builder} to finalize the {@code PTransform}.
*
* <p>Calling {@link Builder#globally()} returns a {@code PTransform} that takes an input {@code
* PCollection<String>} and returns a {@code PCollection<byte[]>} which consists of the
* string-type HLL++ sketch computed from the elements in the input {@code PCollection}.
*
* <p>Calling {@link Builder#perKey()} returns a {@code PTransform} that takes an input {@code
* PCollection<KV<K, String>>} and returns a {@code PCollection<KV<K, byte[]>>} which consists
* of the per-key string-type HLL++ sketch computed from the values matching each key in the
* input {@code PCollection}.
*
* <p>String-type sketches cannot be merged with sketches of other types.
*/
public static Builder<String> forStrings() {
return new Builder<>(HllCountInitFn.forString());
}
/**
* Returns a {@link Builder} for a {@code HllCount.Init} combining {@code PTransform} that
* computes bytes-type HLL++ sketches. Call {@link Builder#globally()} or {@link
* Builder#perKey()} on the returning {@link Builder} to finalize the {@code PTransform}.
*
* <p>Calling {@link Builder#globally()} returns a {@code PTransform} that takes an input {@code
* PCollection<byte[]>} and returns a {@code PCollection<byte[]>} which consists of the
* bytes-type HLL++ sketch computed from the elements in the input {@code PCollection}.
*
* <p>Calling {@link Builder#perKey()} returns a {@code PTransform} that takes an input {@code
* PCollection<KV<K, byte[]>>} and returns a {@code PCollection<KV<K, byte[]>>} which consists
* of the per-key bytes-type HLL++ sketch computed from the values matching each key in the
* input {@code PCollection}.
*
* <p>Bytes-type sketches cannot be merged with sketches of other types.
*/
public static Builder<byte[]> forBytes() {
return new Builder<>(HllCountInitFn.forBytes());
}
/**
* Builder for the {@code HllCount.Init} combining {@code PTransform}.
*
* <p>Call {@link #withPrecision(int)} to customize the {@code precision} parameter of the
* sketch.
*
* <p>Call {@link #globally()} or {@link #perKey()} to finalize the {@code PTransform}.
*
* @param <InputT> element type or value type in {@code KV}s of the input {@code PCollection} to
* the {@code PTransform} being built
*/
public static final class Builder<InputT> {
private final HllCountInitFn<InputT, ?> initFn;
private Builder(HllCountInitFn<InputT, ?> initFn) {
this.initFn = initFn;
}
/**
* Explicitly set the {@code precision} parameter used to compute HLL++ sketch.
*
* <p>Valid range is between {@link #MINIMUM_PRECISION} and {@link #MAXIMUM_PRECISION}. If
* this method is not called, {@link #DEFAULT_PRECISION} will be used. Sketches computed using
* different {@code precision}s cannot be merged together.
*
* @param precision the {@code precision} parameter used to compute HLL++ sketch
*/
public Builder<InputT> withPrecision(int precision) {
initFn.setPrecision(precision);
return this;
}
/**
* Returns a {@link Combine.Globally} {@code PTransform} that takes an input {@code
* PCollection<InputT>} and returns a {@code PCollection<byte[]>} which consists of the HLL++
* sketch computed from the elements in the input {@code PCollection}.
*
* <p>Returns a singleton {@code PCollection} with an "empty sketch" (byte array of length 0)
* if the input {@code PCollection} is empty.
*/
public Combine.Globally<InputT, byte[]> globally() {
return Combine.globally(initFn);
}
/**
* Returns a {@link Combine.PerKey} {@code PTransform} that takes an input {@code
* PCollection<KV<K, InputT>>} and returns a {@code PCollection<KV<K, byte[]>>} which consists
* of the per-key HLL++ sketch computed from the values matching each key in the input {@code
* PCollection}.
*/
public <K> Combine.PerKey<K, InputT, byte[]> perKey() {
return Combine.perKey(initFn);
}
}
}
/**
* Provides {@code PTransform}s to merge HLL++ sketches into a new sketch.
*
* <p>Only sketches of the same type can be merged together. If incompatible sketches are
* provided, a runtime error will occur.
*
* <p>If sketches of different {@code precision}s are merged, the merged sketch will get the
* minimum precision encountered among all the input sketches.
*
* <p>An "empty sketch" represented by an byte array of length 0 is returned if the input {@code
* PCollection} is empty.
*
* <p>Corresponds to the {@code HLL_COUNT.MERGE_PARTIAL(sketch)} function in <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/hll_functions">BigQuery</a>.
*/
public static final class MergePartial {
// Cannot be instantiated. This class is intended to be a namespace only.
private MergePartial() {}
/**
* Returns a {@link Combine.Globally} {@code PTransform} that takes an input {@code
* PCollection<byte[]>} of HLL++ sketches and returns a {@code PCollection<byte[]>} of a new
* sketch merged from the input sketches.
*
* <p>Only sketches of the same type can be merged together. If incompatible sketches are
* provided, a runtime error will occur.
*
* <p>If sketches of different {@code precision}s are merged, the merged sketch will get the
* minimum precision encountered among all the input sketches.
*
* <p>Returns a singleton {@code PCollection} with an "empty sketch" (byte array of length 0) if
* the input {@code PCollection} is empty.
*/
public static Combine.Globally<byte[], byte[]> globally() {
return Combine.globally(HllCountMergePartialFn.create());
}
/**
* Returns a {@link Combine.PerKey} {@code PTransform} that takes an input {@code
* PCollection<KV<K, byte[]>>} of (key, HLL++ sketch) pairs and returns a {@code
* PCollection<KV<K, byte[]>>} of (key, new sketch merged from the input sketches under the
* key).
*
* <p>If sketches of different {@code precision}s are merged, the merged sketch will get the
* minimum precision encountered among all the input sketches.
*
* <p>Only sketches of the same type can be merged together. If incompatible sketches are
* provided, a runtime error will occur.
*/
public static <K> Combine.PerKey<K, byte[], byte[]> perKey() {
return Combine.perKey(HllCountMergePartialFn.create());
}
}
/**
* Provides {@code PTransform}s to extract the estimated count of distinct elements (as {@code
* Long}s) from each HLL++ sketch.
*
* <p>When extracting from an "empty sketch" represented by an byte array of length 0, the result
* returned is 0.
*
* <p>Corresponds to the {@code HLL_COUNT.EXTRACT(sketch)} function in <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/hll_functions">BigQuery</a>.
*/
public static final class Extract {
// Cannot be instantiated. This class is intended to be a namespace only.
private Extract() {}
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<byte[]>} of HLL++
* sketches and returns a {@code PCollection<Long>} of the estimated count of distinct elements
* extracted from each sketch.
*
* <p>Returns 0 if the input element is an "empty sketch" (byte array of length 0).
*/
public static PTransform<PCollection<byte[]>, PCollection<Long>> globally() {
return new Globally();
}
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, byte[]>>} of (key,
* HLL++ sketch) pairs and returns a {@code PCollection<KV<K, Long>>} of (key, estimated count
* of distinct elements extracted from each sketch).
*/
public static <K> PTransform<PCollection<KV<K, byte[]>>, PCollection<KV<K, Long>>> perKey() {
return new PerKey<K>();
}
private static final class Globally extends PTransform<PCollection<byte[]>, PCollection<Long>> {
@Override
public PCollection<Long> expand(PCollection<byte[]> input) {
return input.apply(
ParDo.of(
new DoFn<byte[], Long>() {
@ProcessElement
public void processElement(
@Element byte[] sketch, OutputReceiver<Long> receiver) {
if (sketch == null) {
LOG.warn(
"Received a null and treated it as an empty sketch. "
+ "Consider replacing nulls with empty byte arrays (byte[0]) "
+ "in upstream transforms for better space-efficiency and safety.");
receiver.output(0L);
} else if (sketch.length == 0) {
receiver.output(0L);
} else {
receiver.output(HyperLogLogPlusPlus.forProto(sketch).result());
}
}
}));
}
}
private static final class PerKey<K>
extends PTransform<PCollection<KV<K, byte[]>>, PCollection<KV<K, Long>>> {
@Override
public PCollection<KV<K, Long>> expand(PCollection<KV<K, byte[]>> input) {
return input.apply(
ParDo.of(
new DoFn<KV<K, byte[]>, KV<K, Long>>() {
@ProcessElement
public void processElement(
@Element KV<K, byte[]> kv, OutputReceiver<KV<K, Long>> receiver) {
byte[] sketch = kv.getValue();
if (sketch == null) {
LOG.warn(
"Received a null and treated it as an empty sketch. "
+ "Consider replacing nulls with empty byte arrays (byte[0]) "
+ "in upstream transforms for better space-efficiency and safety.");
receiver.output(KV.of(kv.getKey(), 0L));
} else if (sketch.length == 0) {
receiver.output(KV.of(kv.getKey(), 0L));
} else {
receiver.output(
KV.of(kv.getKey(), HyperLogLogPlusPlus.forProto(sketch).result()));
}
}
}));
}
}
}
}