blob: fc901664be5a1b3f8ca285fa98f31d620c0e2af6 [file] [log] [blame]
/*
* Copyright (C) 2014 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.transforms;
/**
* {@code PTransform}s for computing the maximum of the elements in a
* {@code PCollection}, or the maximum of the values associated with
* each key in a {@code PCollection} of {@code KV}s.
*
* <p> Example 1: get the maximum of a {@code PCollection} of {@code Double}s.
* <pre> {@code
* PCollection<Double> input = ...;
* PCollection<Double> max = input.apply(Max.doublesGlobally());
* } </pre>
*
* <p> Example 2: calculate the maximum of the {@code Integer}s
* associated with each unique key (which is of type {@code String}).
* <pre> {@code
* PCollection<KV<String, Integer>> input = ...;
* PCollection<KV<String, Integer>> maxPerKey = input
* .apply(Max.<String>integersPerKey());
* } </pre>
*/
public class Max {
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<Integer>} and returns a
* {@code PCollection<Integer>} whose contents is the maximum of the
* input {@code PCollection}'s elements, or
* {@code Integer.MIN_VALUE} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
Combine.Globally<Integer, Integer> combine = Combine
.globally(new MaxIntegerFn());
combine.setName("Max");
return combine;
}
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<KV<K, Integer>>} and returns a
* {@code PCollection<KV<K, Integer>>} that contains an output
* element mapping each distinct key in the input
* {@code PCollection} to the maximum of the values associated with
* that key in the input {@code PCollection}.
*
* <p> See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
Combine.PerKey<K, Integer, Integer> combine = Combine
.perKey(new MaxIntegerFn());
combine.setName("Max.PerKey");
return combine;
}
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<Long>} and returns a
* {@code PCollection<Long>} whose contents is the maximum of the
* input {@code PCollection}'s elements, or
* {@code Long.MIN_VALUE} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
Combine.Globally<Long, Long> combine = Combine.globally(new MaxLongFn());
combine.setName("Max");
return combine;
}
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<KV<K, Long>>} and returns a
* {@code PCollection<KV<K, Long>>} that contains an output
* element mapping each distinct key in the input
* {@code PCollection} to the maximum of the values associated with
* that key in the input {@code PCollection}.
*
* <p> See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
Combine.PerKey<K, Long, Long> combine = Combine
.perKey(new MaxLongFn());
combine.setName("Max.PerKey");
return combine;
}
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<Double>} and returns a
* {@code PCollection<Double>} whose contents is the maximum of the
* input {@code PCollection}'s elements, or
* {@code Double.MIN_VALUE} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
Combine.Globally<Double, Double> combine = Combine
.globally(new MaxDoubleFn());
combine.setName("Max");
return combine;
}
/**
* Returns a {@code PTransform} that takes an input
* {@code PCollection<KV<K, Double>>} and returns a
* {@code PCollection<KV<K, Double>>} that contains an output
* element mapping each distinct key in the input
* {@code PCollection} to the maximum of the values associated with
* that key in the input {@code PCollection}.
*
* <p> See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
Combine.PerKey<K, Double, Double> combine = Combine
.perKey(new MaxDoubleFn());
combine.setName("Max.PerKey");
return combine;
}
/////////////////////////////////////////////////////////////////////////////
/**
* A {@code SerializableFunction} that computes the maximum of an
* {@code Iterable} of numbers of type {@code N}, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*
* @param <N> the type of the {@code Number}s being compared
*/
@SuppressWarnings("serial")
public static class MaxFn<N extends Number & Comparable<N>>
implements SerializableFunction<Iterable<N>, N> {
/** The smallest value of type N. */
private final N initialValue;
/**
* Constructs a combining function that computes the maximum over
* a collection of values of type {@code N}, given the smallest
* value of type {@code N}, which is the identity value for the
* maximum operation over {@code N}s.
*/
public MaxFn(N initialValue) {
this.initialValue = initialValue;
}
@Override
public N apply(Iterable<N> input) {
N max = initialValue;
for (N value : input) {
if (value.compareTo(max) > 0) {
max = value;
}
}
return max;
}
}
/**
* A {@code SerializableFunction} that computes the maximum of an
* {@code Iterable} of {@code Integer}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
@SuppressWarnings("serial")
public static class MaxIntegerFn extends MaxFn<Integer> {
public MaxIntegerFn() { super(Integer.MIN_VALUE); }
}
/**
* A {@code SerializableFunction} that computes the maximum of an
* {@code Iterable} of {@code Long}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
@SuppressWarnings("serial")
public static class MaxLongFn extends MaxFn<Long> {
public MaxLongFn() { super(Long.MIN_VALUE); }
}
/**
* A {@code SerializableFunction} that computes the maximum of an
* {@code Iterable} of {@code Double}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
@SuppressWarnings("serial")
public static class MaxDoubleFn extends MaxFn<Double> {
public MaxDoubleFn() { super(Double.MIN_VALUE); }
}
}