| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.crunch.fn; |
| |
| import static org.apache.crunch.fn.Aggregators.*; |
| import static org.hamcrest.Matchers.closeTo; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.crunch.Aggregator; |
| import org.apache.crunch.CombineFn; |
| import org.apache.crunch.Pair; |
| import org.apache.crunch.Tuple3; |
| import org.apache.crunch.Tuple4; |
| import org.apache.crunch.TupleN; |
| import org.apache.crunch.impl.mem.emit.InMemoryEmitter; |
| import org.junit.Test; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| |
| |
| public class AggregatorsTest { |
| |
| @Test |
| public void testSums2() { |
| assertThat(sapply(SUM_INTS(), 1, 2, 3, -4), is(2)); |
| assertThat(sapply(SUM_LONGS(), 1L, 2L, 3L, -4L, 5000000000L), is(5000000002L)); |
| assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f)); |
| assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001))); |
| assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10"))); |
| assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("1.122"), bigDecimal("0.654")), is(bigDecimal("1.776"))); |
| } |
| |
| @Test |
| public void testSums() { |
| assertThat(sapply(SUM_LONGS(), 29L, 17L, 1729L), is(1775L)); |
| assertThat(sapply(SUM_LONGS(), 29L, 7L, 1729L), is(1765L)); |
| assertThat(sapply(SUM_INTS(), 29, 17, 1729), is(1775)); |
| assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f)); |
| assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0)); |
| assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775"))); |
| assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1777.739"))); |
| } |
| |
| @Test |
| public void testMax() { |
| assertThat(sapply(MAX_LONGS(), 29L, 17L, 1729L), is(1729L)); |
| assertThat(sapply(MAX_INTS(), 29, 17, 1729), is(1729)); |
| assertThat(sapply(MAX_FLOATS(), 29f, 17f, 1729f), is(1729.0f)); |
| assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0)); |
| assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f)); |
| assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729"))); |
| assertThat(sapply(MAX_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1729.876"))); |
| assertThat(sapply(Aggregators.<String>MAX_COMPARABLES(), "b", "a", "d", "c"), is("d")); |
| } |
| |
| @Test |
| public void testMin() { |
| assertThat(sapply(MIN_LONGS(), 29L, 17L, 1729L), is(17L)); |
| assertThat(sapply(MIN_INTS(), 29, 17, 1729), is(17)); |
| assertThat(sapply(MIN_FLOATS(), 29f, 17f, 1729f), is(17.0f)); |
| assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0)); |
| assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29)); |
| assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17"))); |
| assertThat(sapply(MIN_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("17.876"))); |
| assertThat(sapply(Aggregators.<String>MIN_COMPARABLES(), "b", "a", "d", "c"), is("a")); |
| } |
| |
| @Test |
| public void testMaxN() { |
| assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009))); |
| assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b"))); |
| assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d"))); |
| assertThat(apply(MAX_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(3, 3))); |
| } |
| |
| @Test |
| public void testMaxUniqueN() { |
| assertThat(apply(MAX_UNIQUE_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(2, 3))); |
| } |
| |
| @Test |
| public void testMinN() { |
| assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29))); |
| assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a"))); |
| assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c"))); |
| assertThat(apply(MIN_N(2, Integer.class), 1, 1, 2, 3), is(ImmutableList.of(1, 1))); |
| } |
| |
| @Test |
| public void testMinUniqueN() { |
| assertThat(apply(MIN_UNIQUE_N(2, Integer.class), 3, 2, 1, 1), is(ImmutableList.of(1, 2))); |
| } |
| |
| @Test |
| public void testFirstN() { |
| assertThat(apply(Aggregators.<Integer>FIRST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 34))); |
| } |
| |
| @Test |
| public void testLastN() { |
| assertThat(apply(Aggregators.<Integer>LAST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(29, 1009))); |
| } |
| |
| @Test |
| public void testUniqueElements() { |
| assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17, 29, 29, 16, 17)), |
| is(ImmutableSet.of(17, 29, 16))); |
| |
| Iterable<Integer> samp = apply(Aggregators.<Integer>SAMPLE_UNIQUE_ELEMENTS(2), 17, 29, 16, 17, 29, 16); |
| assertThat(Iterables.size(samp), is(2)); |
| assertThat(ImmutableSet.copyOf(samp).size(), is(2)); // check that the two elements are unique |
| } |
| |
| @Test |
| public void testPairs() { |
| List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14)); |
| Aggregator<Pair<Long, Double>> a = Aggregators.pairAggregator(SUM_LONGS(), MIN_DOUBLES()); |
| |
| assertThat(sapply(a, input), is(Pair.of(1729L, -3.14))); |
| } |
| |
| @Test |
| public void testPairsTwoLongs() { |
| List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L)); |
| Aggregator<Pair<Long, Long>> a = Aggregators.pairAggregator(SUM_LONGS(), SUM_LONGS()); |
| |
| assertThat(sapply(a, input), is(Pair.of(1729L, 20L))); |
| } |
| |
| @Test |
| public void testTrips() { |
| List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1), |
| Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98)); |
| Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator( |
| MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES()); |
| |
| List<Tuple3<Float, BigDecimal, BigDecimal>> input1 = ImmutableList.of(Tuple3.of(17.29f, bigDecimal("12.2"), bigDecimal("0.1")), |
| Tuple3.of(3.0f, bigDecimal("1.2"), bigDecimal("3.14")), Tuple3.of(-1.0f, bigDecimal("14.5"), bigDecimal("-0.98"))); |
| Aggregator<Tuple3<Float, BigDecimal, BigDecimal>> b = Aggregators.tripAggregator( |
| MAX_FLOATS(), MAX_BIGDECIMALS(), MIN_BIGDECIMALS()); |
| |
| assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98))); |
| assertThat(sapply(b, input1), is(Tuple3.of(17.29f, bigDecimal("14.5"), bigDecimal("-0.98")))); |
| } |
| |
| @Test |
| public void testQuads() { |
| List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1), |
| Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3)); |
| Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator( |
| MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS()); |
| |
| List<Tuple4<BigDecimal, Double, Double, Integer>> input1 = ImmutableList.of(Tuple4.of(bigDecimal("17.29"), 12.2, 0.1, 1), |
| Tuple4.of(bigDecimal("3.0"), 1.2, 3.14, 2), Tuple4.of(bigDecimal("-1.0"), 14.5, -0.98, 3)); |
| Aggregator<Tuple4<BigDecimal, Double, Double, Integer>> b = Aggregators.quadAggregator( |
| MAX_BIGDECIMALS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS()); |
| |
| assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6))); |
| assertThat(sapply(b, input1), is(Tuple4.of(bigDecimal("17.29"), 14.5, -0.98, 6))); |
| } |
| |
| @Test |
| public void testTupleN() { |
| List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L)); |
| Aggregator<TupleN> a = Aggregators.tupleAggregator( |
| MIN_INTS(), SUM_DOUBLES(), MAX_INTS(), MIN_DOUBLES(), MAX_LONGS()); |
| |
| assertThat(sapply(a, input), is(new TupleN(1, 20.0, 1, 2.0, 12L))); |
| } |
| |
| @Test |
| public void testConcatenation() { |
| assertThat(sapply(STRING_CONCAT("", true), "foo", "foobar", "bar"), is("foofoobarbar")); |
| assertThat(sapply(STRING_CONCAT("/", false), "foo", "foobar", "bar"), is("foo/foobar/bar")); |
| assertThat(sapply(STRING_CONCAT(" ", true), " ", ""), is(" ")); |
| assertThat(sapply(STRING_CONCAT(" ", true), Arrays.asList(null, "")), is("")); |
| assertThat(sapply(STRING_CONCAT(" ", true, 20, 3), "foo", "foobar", "bar"), is("foo bar")); |
| assertThat(sapply(STRING_CONCAT(" ", true, 10, 6), "foo", "foobar", "bar"), is("foo foobar")); |
| assertThat(sapply(STRING_CONCAT(" ", true, 9, 6), "foo", "foobar", "bar"), is("foo bar")); |
| } |
| |
| @Test(expected = NullPointerException.class) |
| public void testConcatenationNullException() { |
| sapply(STRING_CONCAT(" ", false), Arrays.asList(null, "" )); |
| } |
| |
| |
| private static <T> T sapply(Aggregator<T> a, T... values) { |
| return sapply(a, ImmutableList.copyOf(values)); |
| } |
| |
| private static <T> T sapply(Aggregator<T> a, Iterable<T> values) { |
| return Iterables.getOnlyElement(apply(a, values)); |
| } |
| |
| private static <T> ImmutableList<T> apply(Aggregator<T> a, T... values) { |
| return apply(a, ImmutableList.copyOf(values)); |
| } |
| |
| private static <T> ImmutableList<T> apply(Aggregator<T> a, Iterable<T> values) { |
| CombineFn<String, T> fn = Aggregators.toCombineFn(a); |
| |
| InMemoryEmitter<Pair<String, T>> e1 = new InMemoryEmitter<Pair<String,T>>(); |
| fn.process(Pair.of("", values), e1); |
| |
| // and a second time to make sure Aggregator.reset() works |
| InMemoryEmitter<Pair<String, T>> e2 = new InMemoryEmitter<Pair<String,T>>(); |
| fn.process(Pair.of("", values), e2); |
| |
| assertEquals(getValues(e1), getValues(e2)); |
| |
| return getValues(e1); |
| } |
| |
| private static <K, V> ImmutableList<V> getValues(InMemoryEmitter<Pair<K, V>> emitter) { |
| return ImmutableList.copyOf( |
| Iterables.transform(emitter.getOutput(), new Function<Pair<K, V>, V>() { |
| @Override |
| public V apply(Pair<K, V> input) { |
| return input.second(); |
| } |
| })); |
| } |
| |
| private static BigInteger bigInt(String value) { |
| return new BigInteger(value); |
| } |
| |
| private static BigDecimal bigDecimal(String value) { |
| return new BigDecimal(value); |
| } |
| } |