blob: 54d20f9e5dd5d2513519925d88dd62f622454a0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.lib;
import com.google.common.collect.Lists;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import java.util.Collection;
import java.util.Iterator;
/**
* Tools for creating top lists of items in PTables and PCollections
*/
public class TopList {
/**
* Create a top-list of elements in the provided PTable, categorised by the key of the input table and using the count
* of the value part of the input table. Example: if input = Table(Country, Track), then this will give you the most
* common n tracks for each country.
* @param input table of X Y pairs
* @param n How many Y values to include in the toplist per X (this will be in memory, so don't make this ridiculous)
* @param <X> group type
* @param <Y> value type
* @return table of each unique X value mapped to a collection of (count, Y) pairs
*/
public static <X, Y> PTable<X, Collection<Pair<Long, Y>>> topNYbyX(PTable<X, Y> input, final int n) {
final PType<X> xType = input.getKeyType();
final PType<Y> yType = input.getValueType();
PTypeFamily f = xType.getFamily();
PTable<X, Pair<Long, Y>> counted = input.count().parallelDo(new MapFn<Pair<Pair<X, Y>, Long>, Pair<X, Pair<Long, Y>>>() {
@Override
public Pair<X, Pair<Long, Y>> map(Pair<Pair<X, Y>, Long> input) {
return Pair.of(input.first().first(), Pair.of(-input.second(), input.first().second()));
}
}, f.tableOf(xType, f.pairs(f.longs(), yType)));
return SecondarySort.sortAndApply(counted, new MapFn<Pair<X, Iterable<Pair<Long, Y>>>, Pair<X, Collection<Pair<Long, Y>>>>() {
private PTableType<Long, Y> tableType;
@Override
public void initialize() {
PTypeFamily ptf = yType.getFamily();
tableType = ptf.tableOf(ptf.longs(), yType);
tableType.initialize(getConfiguration());
}
@Override
public Pair<X, Collection<Pair<Long, Y>>> map(Pair<X, Iterable<Pair<Long, Y>>> input) {
Collection<Pair<Long, Y>> values = Lists.newArrayList();
Iterator<Pair<Long, Y>> iter = input.second().iterator();
for (int i = 0; i < n; i++) {
if (!iter.hasNext()) {
break;
}
Pair<Long, Y> pair = PTables.getDetachedValue(tableType, iter.next());
values.add(Pair.of(-pair.first(), pair.second()));
}
return Pair.of(input.first(), values);
}
}, f.tableOf(xType, f.collections(f.pairs(f.longs(), yType))));
}
/**
* Create a list of unique items in the input collection with their count, sorted descending by their frequency.
* @param input input collection
* @param <X> record type
* @return global toplist
*/
public static <X> PTable<X, Long> globalToplist(PCollection<X> input) {
return negateCounts(negateCounts(input.count()).groupByKey(1).ungroup());
}
/**
* When creating toplists, it is often required to sort by count descending. As some sort operations don't support
* order (such as SecondarySort), this method will negate counts so that a natural-ordered sort will produce a
* descending order.
* @param table PTable to process
* @param <K> key type
* @return PTable of the same format with the value negated
*/
public static <K> PTable<K, Long> negateCounts(PTable<K, Long> table) {
return table.parallelDo(new MapFn<Pair<K, Long>, Pair<K, Long>>() {
@Override
public Pair<K, Long> map(Pair<K, Long> input) {
return Pair.of(input.first(), -input.second());
}
}, table.getPTableType());
}
}