blob: 09553d1d7df369860ccbd6b780ef8b5626d1710c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.join;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for CoGroupByKeyTest. Implements Serializable for anonymous DoFns. */
@RunWith(JUnit4.class)
public class CoGroupByKeyTest implements Serializable {
/**
* Converts the given list into a PCollection belonging to the provided Pipeline in such a way
* that coder inference needs to be performed.
*/
private PCollection<KV<Integer, String>> createInput(
String name, Pipeline p, List<KV<Integer, String>> list) {
return createInput(name, p, list, new ArrayList<>());
}
/** Converts the given list with timestamps into a PCollection. */
private PCollection<KV<Integer, String>> createInput(
String name, Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
PCollection<KV<Integer, String>> input;
if (timestamps.isEmpty()) {
input =
p.apply(
"Create" + name,
Create.of(list)
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
} else {
input =
p.apply(
"Create" + name,
Create.timestamped(list, timestamps)
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
}
return input.apply(
"Identity" + name,
ParDo.of(
new DoFn<KV<Integer, String>, KV<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}));
}
/**
* Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result of a {@link
* CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, where each {@link PCollection}
* has no duplicate keys and the key sets of each {@link PCollection} are intersecting but neither
* is a subset of the other.
*/
private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
Pipeline p, TupleTag<String> tag1, TupleTag<String> tag2) {
List<KV<Integer, String>> list1 =
Arrays.asList(KV.of(1, "collection1-1"), KV.of(2, "collection1-2"));
List<KV<Integer, String>> list2 =
Arrays.asList(KV.of(2, "collection2-2"), KV.of(3, "collection2-3"));
PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(tag1, collection1)
.and(tag2, collection2)
.apply(CoGroupByKey.create());
return coGbkResults;
}
@Rule public final transient TestPipeline p = TestPipeline.create();
@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCoGroupByKeyGetOnly() {
final TupleTag<String> tag1 = new TupleTag<>();
final TupleTag<String> tag2 = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> coGbkResults = buildGetOnlyGbk(p, tag1, tag2);
PAssert.thatMap(coGbkResults)
.satisfies(
results -> {
assertEquals("collection1-1", results.get(1).getOnly(tag1));
assertEquals("collection1-2", results.get(2).getOnly(tag1));
assertEquals("collection2-2", results.get(2).getOnly(tag2));
assertEquals("collection2-3", results.get(3).getOnly(tag2));
return null;
});
p.run();
}
/**
* Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the results of the {@code
* CoGroupByKey} over three {@code PCollection<KV<Integer, String>>}, each of which correlates a
* customer id to purchases, addresses, or names, respectively.
*/
private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
Pipeline p,
TupleTag<String> purchasesTag,
TupleTag<String> addressesTag,
TupleTag<String> namesTag) {
List<KV<Integer, String>> idToPurchases =
Arrays.asList(
KV.of(2, "Boat"),
KV.of(1, "Shoes"),
KV.of(3, "Car"),
KV.of(1, "Book"),
KV.of(10, "Pens"),
KV.of(8, "House"),
KV.of(4, "Suit"),
KV.of(11, "House"),
KV.of(14, "Shoes"),
KV.of(2, "Suit"),
KV.of(8, "Suit Case"),
KV.of(3, "House"));
List<KV<Integer, String>> idToAddress =
Arrays.asList(
KV.of(2, "53 S. 3rd"),
KV.of(10, "383 Jackson Street"),
KV.of(20, "3 W. Arizona"),
KV.of(3, "29 School Rd"),
KV.of(8, "6 Watling Rd"));
List<KV<Integer, String>> idToName =
Arrays.asList(
KV.of(1, "John Smith"),
KV.of(2, "Sally James"),
KV.of(8, "Jeffery Spalding"),
KV.of(20, "Joan Lichtfield"));
PCollection<KV<Integer, String>> purchasesTable =
createInput("CreateIdToPurchases", p, idToPurchases);
PCollection<KV<Integer, String>> addressTable =
createInput("CreateIdToAddress", p, idToAddress);
PCollection<KV<Integer, String>> nameTable = createInput("CreateIdToName", p, idToName);
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(namesTag, nameTable)
.and(addressesTag, addressTable)
.and(purchasesTag, purchasesTable)
.apply(CoGroupByKey.create());
return coGbkResults;
}
/**
* Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the results of the {@code
* CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, each of which correlates a
* customer id to clicks, purchases, respectively.
*/
private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(
Pipeline p, TupleTag<String> clicksTag, TupleTag<String> purchasesTag) {
List<KV<Integer, String>> idToClick =
Arrays.asList(
KV.of(1, "Click t0"),
KV.of(2, "Click t2"),
KV.of(1, "Click t4"),
KV.of(1, "Click t6"),
KV.of(2, "Click t8"));
List<KV<Integer, String>> idToPurchases =
Arrays.asList(
KV.of(1, "Boat t1"),
KV.of(1, "Shoesi t2"),
KV.of(1, "Pens t3"),
KV.of(2, "House t4"),
KV.of(2, "Suit t5"),
KV.of(1, "Car t6"),
KV.of(1, "Book t7"),
KV.of(2, "House t8"),
KV.of(2, "Shoes t9"),
KV.of(2, "House t10"));
PCollection<KV<Integer, String>> clicksTable =
createInput("CreateClicks", p, idToClick, Arrays.asList(0L, 2L, 4L, 6L, 8L))
.apply(
"WindowClicks",
Window.<KV<Integer, String>>into(FixedWindows.of(new Duration(4)))
.withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, String>> purchasesTable =
createInput(
"CreatePurchases",
p,
idToPurchases,
Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
.apply(
"WindowPurchases",
Window.<KV<Integer, String>>into(FixedWindows.of(new Duration(4)))
.withTimestampCombiner(TimestampCombiner.EARLIEST));
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(clicksTag, clicksTable)
.and(purchasesTag, purchasesTable)
.apply(CoGroupByKey.create());
return coGbkResults;
}
@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCoGroupByKey() {
final TupleTag<String> namesTag = new TupleTag<>();
final TupleTag<String> addressesTag = new TupleTag<>();
final TupleTag<String> purchasesTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
PAssert.thatMap(coGbkResults)
.satisfies(
results -> {
CoGbkResult result1 = results.get(1);
assertEquals("John Smith", result1.getOnly(namesTag));
assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book"));
CoGbkResult result2 = results.get(2);
assertEquals("Sally James", result2.getOnly(namesTag));
assertEquals("53 S. 3rd", result2.getOnly(addressesTag));
assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat"));
CoGbkResult result3 = results.get(3);
assertEquals("29 School Rd", "29 School Rd", result3.getOnly(addressesTag));
assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House"));
CoGbkResult result8 = results.get(8);
assertEquals("Jeffery Spalding", result8.getOnly(namesTag));
assertEquals("6 Watling Rd", result8.getOnly(addressesTag));
assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case"));
CoGbkResult result20 = results.get(20);
assertEquals("Joan Lichtfield", result20.getOnly(namesTag));
assertEquals("3 W. Arizona", result20.getOnly(addressesTag));
assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag));
assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit"));
assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens"));
assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House"));
assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes"));
return null;
});
p.run();
}
/**
* A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a
* CoGroupByKey.
*/
private static class ClickOfPurchaseFn
extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
private final TupleTag<String> clicksTag;
private final TupleTag<String> purchasesTag;
private ClickOfPurchaseFn(TupleTag<String> clicksTag, TupleTag<String> purchasesTag) {
this.clicksTag = clicksTag;
this.purchasesTag = purchasesTag;
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
BoundedWindow w = window;
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
Iterable<String> clicks = row.getAll(clicksTag);
Iterable<String> purchases = row.getAll(purchasesTag);
for (String click : clicks) {
for (String purchase : purchases) {
c.output(
KV.of(
click + ":" + purchase,
c.timestamp().getMillis() + ":" + w.maxTimestamp().getMillis()));
}
}
}
}
/**
* A DoFn used in testCoGroupByKeyHandleResults(), to test processing the results of a
* CoGroupByKey.
*/
private static class CorrelatePurchaseCountForAddressesWithoutNamesFn
extends DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
private final TupleTag<String> purchasesTag;
private final TupleTag<String> addressesTag;
private final TupleTag<String> namesTag;
private CorrelatePurchaseCountForAddressesWithoutNamesFn(
TupleTag<String> purchasesTag, TupleTag<String> addressesTag, TupleTag<String> namesTag) {
this.purchasesTag = purchasesTag;
this.addressesTag = addressesTag;
this.namesTag = namesTag;
}
@ProcessElement
public void processElement(ProcessContext c) {
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
// Don't actually care about the id.
Iterable<String> names = row.getAll(namesTag);
if (names.iterator().hasNext()) {
// Nothing to do. There was a name.
return;
}
Iterable<String> addresses = row.getAll(addressesTag);
if (!addresses.iterator().hasNext()) {
// Nothing to do, there was no address.
return;
}
// Buffer the addresses so we can accredit all of them with
// corresponding purchases. All addresses are for the same id, so
// if there are multiple, we apply the same purchase count to all.
ArrayList<String> addressList = new ArrayList<>();
for (String address : addresses) {
addressList.add(address);
}
Iterable<String> purchases = row.getAll(purchasesTag);
int purchaseCount = Iterables.size(purchases);
for (String address : addressList) {
c.output(KV.of(address, purchaseCount));
}
}
}
/**
* Tests that the consuming DoFn (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as
* expected.
*/
@SuppressWarnings("unchecked")
@Test
@Category(NeedsRunner.class)
public void testConsumingDoFn() throws Exception {
TupleTag<String> purchasesTag = new TupleTag<>();
TupleTag<String> addressesTag = new TupleTag<>();
TupleTag<String> namesTag = new TupleTag<>();
// result1 should get filtered out because it has a name.
CoGbkResult result1 =
CoGbkResult.of(purchasesTag, Arrays.asList("3a", "3b"))
.and(addressesTag, Arrays.asList("2a", "2b"))
.and(namesTag, Arrays.asList("1a"));
// result 2 should be counted because it has an address and purchases.
CoGbkResult result2 =
CoGbkResult.of(purchasesTag, Arrays.asList("5a", "5b"))
.and(addressesTag, Arrays.asList("4a"))
.and(namesTag, new ArrayList<>());
// result 3 should not be counted because it has no addresses.
CoGbkResult result3 =
CoGbkResult.of(purchasesTag, Arrays.asList("7a", "7b"))
.and(addressesTag, new ArrayList<>())
.and(namesTag, new ArrayList<>());
// result 4 should be counted as 0, because it has no purchases.
CoGbkResult result4 =
CoGbkResult.of(purchasesTag, new ArrayList<>())
.and(addressesTag, Arrays.asList("8a"))
.and(namesTag, new ArrayList<>());
KvCoder<Integer, CoGbkResult> coder =
KvCoder.of(
VarIntCoder.of(),
CoGbkResult.CoGbkResultCoder.of(
CoGbkResultSchema.of(ImmutableList.of(purchasesTag, addressesTag, namesTag)),
UnionCoder.of(
ImmutableList.of(
StringUtf8Coder.of(), StringUtf8Coder.of(), StringUtf8Coder.of()))));
PCollection<KV<String, Integer>> results =
p.apply(
Create.of(
KV.of(1, result1), KV.of(2, result2), KV.of(3, result3), KV.of(4, result4))
.withCoder(coder))
.apply(
ParDo.of(
new CorrelatePurchaseCountForAddressesWithoutNamesFn(
purchasesTag, addressesTag, namesTag)));
PAssert.that(results).containsInAnyOrder(KV.of("4a", 2), KV.of("8a", 0));
p.run();
}
/**
* Tests the pipeline end-to-end. Builds the purchases CoGroupByKey, and applies
* CorrelatePurchaseCountForAddressesWithoutNamesFn to the results.
*/
@SuppressWarnings("unchecked")
@Test
@Category(ValidatesRunner.class)
public void testCoGroupByKeyHandleResults() {
TupleTag<String> namesTag = new TupleTag<>();
TupleTag<String> addressesTag = new TupleTag<>();
TupleTag<String> purchasesTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
// Do some simple processing on the result of the CoGroupByKey. Count the
// purchases for each address on record that has no associated name.
PCollection<KV<String, Integer>> purchaseCountByKnownAddressesWithoutKnownNames =
coGbkResults.apply(
ParDo.of(
new CorrelatePurchaseCountForAddressesWithoutNamesFn(
purchasesTag, addressesTag, namesTag)));
PAssert.that(purchaseCountByKnownAddressesWithoutKnownNames)
.containsInAnyOrder(KV.of("29 School Rd", 2), KV.of("383 Jackson Street", 1));
p.run();
}
/** Tests the pipeline end-to-end with FixedWindows. */
@SuppressWarnings("unchecked")
@Test
@Category(ValidatesRunner.class)
public void testCoGroupByKeyWithWindowing() {
TupleTag<String> clicksTag = new TupleTag<>();
TupleTag<String> purchasesTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
buildPurchasesCoGbkWithWindowing(p, clicksTag, purchasesTag);
PCollection<KV<String, String>> clickOfPurchase =
coGbkResults.apply(ParDo.of(new ClickOfPurchaseFn(clicksTag, purchasesTag)));
PAssert.that(clickOfPurchase)
.containsInAnyOrder(
KV.of("Click t0:Boat t1", "0:3"),
KV.of("Click t0:Shoesi t2", "0:3"),
KV.of("Click t0:Pens t3", "0:3"),
KV.of("Click t4:Car t6", "4:7"),
KV.of("Click t4:Book t7", "4:7"),
KV.of("Click t6:Car t6", "4:7"),
KV.of("Click t6:Book t7", "4:7"),
KV.of("Click t8:House t8", "8:11"),
KV.of("Click t8:Shoes t9", "8:11"),
KV.of("Click t8:House t10", "8:11"));
p.run();
}
}