blob: b113a1cc66ac15e4f296271f21a7b714d00ac55d [file] [log] [blame]
---
title: Sampling - Guide - Apache DataFu Pig
version: 1.4.0
section_name: Apache DataFu Pig - Guide
license: >
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
## Sampling
Pig has a built-in `SAMPLE` operator that performs [Bernoulli sampling](http://en.wikipedia.org/wiki/Bernoulli_sampling)
on a relation. Apache DataFu Pig provides additional sampling techniques for when Bernoulli sampling is not applicable.
### Simple Random Sampling
[Simple Random Sampling](http://en.wikipedia.org/wiki/Simple_random_sampling) produces samples of a specific size,
where each item has the same probability of being chosen. DataFu has scalable implementations of this that will
generate samples of exactly the right size with very high probability (at least 99.99%).
Pig's `SAMPLE`, on the other hand, produces a sample of size *roughly* `p*n`,
where `p` is the sampling probability and `n` is the sample size. With `SAMPLE` there are no guarantees on the size of the
generated sample.
#### Simple Random Sample Without Replacement
[SimpleRandomSample](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSample.html)
implements scalable simple random sampling.
It can be used to generate a sample of size of exactly `ceil(p*n)` without replacement from a population of size `n`,
where `p` is the sampling
probability. The output will be exactly this size with probability at least 99.99%. Sampling "without replacement"
means that no item will appear more than once.
To use it simply pass in the sampling probability into the UDF's constructor and then pass in a bag to be sampled.
For example, the following will produce a 1% sample:
```pig
DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
input = LOAD 'input' AS (x:double);
sampled = FOREACH (GROUP input ALL) GENERATE FLATTEN(SRS(input));
```
This UDF can also be used to perform [stratified sampling](http://en.wikipedia.org/wiki/Stratified_sampling).
For example, the following takes a 1% stratified sample using a label and a proportional allocation strategy:
```pig
DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
examples = LOAD 'input' AS (x:double,label:chararray);
grouped = GROUP examples BY label;
sampled = FOREACH grouped GENERATE FLATTEN(SRS(examples));
```
#### Simple Random Sample With Replacement
[SimpleRandomSampleWithReplacementVote](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.html) and
[SimpleRandomSampleWithReplacementElect](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.html)
together implements a scalable algorithm for simple random sampling with replacement.
These can be used to generate a sample of a specific size, with probability at least 99.99%.
To use these UDFs, the user needs to provide the desired sample size and a good lower bound on the population
size (or the exact size). For example, to generate a sample of 100,000 without replacement:
```pig
DEFINE SRSWR_VOTE
datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
DEFINE SRSWR_ELECT
datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
item = LOAD 'input' AS (x:double);
summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
candidates = FOREACH item GENERATE
FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE
FLATTEN(SRSWR_ELECT(candidates));
```
Here we pass in the exact size for the lower bound. Because of the way the algorithm works, we can use many reducers
to generate the final set of sampled data. This is why we use `PARALLEL 10`. The parallel factor can be increased
if necessary to distribute the work more.
Sampling with replacement is used heavily in [bootstrapping](http://en.wikipedia.org/wiki/Bootstrapping_%28statistics%29).
For example, the following script generates 100 bootstrap samples, computes the mean value for each sample,
and then outputs the bootstrap estimates.
```pig
summary = FOREACH (GROUP item ALL) GENERATE
AVG(item.x) AS mean, COUNT(item) AS count;
candidates = FOREACH item GENERATE
FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
sampled = FOREACH (GROUP candidates BY (position % 100)
PARALLEL 10) GENERATE
AVG(SRSWR_ELECT(candidates)) AS mean;
bootstrap = FOREACH (GROUP sampled ALL) GENERATE
summary.mean AS mean, sampled.mean AS bootstrapMeans;
```
### Weighted Random Sampling
A weighted sample is similar to a simple random sample without replacement in that it generates a sample
with a specific size. The difference is that the probability of selecting each item can be different.
[WeightedSample](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/WeightedSample.html)
provides an implementation of this.
`WeightedSample` operates on a bag of items, where each item has a weight attached to it. It iteratively selects tems from the
bag until it reaches the desired output bag size. Since this samples without replacement, once an item is selected it cannot
appear again. The probability of selecting an item is given by that item's weight divided by the sum of all remaining
items' weights.
For example, suppose
that we have a bag of four items: `a`, `b`, `c`, `d`. For this bag, `a` has a weight of 100 and the remaining have a weight
of 1.
```pig
input = LOAD 'input' AS (A: bag{T: tuple(name:chararray,score:int)});
/* Contains a single bag:
{(a,100),(b,1),(c,1),(d,1)}
*/
```
We expect a weighted sample of this bag to contain `a` with very high probability.
Let's generate a sample of size 3 from this bag. To do this we pass in the bag, with 1 to indicate the weight
is at index 1, and the sample size of 3.
```pig
define WeightedSample datafu.pig.sampling.WeightedSample()
result = FOREACH input GENERATE WeightedSample(A,1,3);
```
This is likely to generate output like this, where `a` tends to be present due to its high weight.
```pig
DUMP result;
/*
({(a,100),(c,5),(b,1)})
*/
```
Alternatively, if we don't pass in sample size, `WeightedSample` will include all items, with the order being influenced
by the item weights.
```pig
result = FOREACH input GENERATE WeightedSample(A,1);
DUMP result;
/*
({(a,100),(c,5),(b,1),(d,1)})
*/
```
One simple technique for generating weights that can be used with `WeightedSample` is to use DataFu's
[Enumerate](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/Enumerate.html) UDF, which can be used
to append each item's tuple with its index within the bag.
Again, suppose we have a bag with values `a`, `b`, `c`, `d`, but this time without weights.
```pig
input = LOAD 'input' AS (A: bag{T: tuple(name:chararray)});
/* Contains a single bag:
{(a),(b),(c),(d)}
*/
```
Using `Enumerate`, we can append the index for each item and then compute a score from it.
```pig
define Enumerate datafu.pig.bags.Enumerate();
data = FOREACH data GENERATE Enumerate(A) as A;
data = FOREACH data {
A = FOREACH A GENERATE v1, 1/(double)(i+1) as score;
GENERATE A;
}
/* Produces:
({(a,1.0),(b,0.5),(c,0.3333333333333333),(d,0.25)})
*/
```
This bag can then be passed into `WeightedSample`. This produces a simple random sample where the items in the
beginning of the bag are more likely to be selected.
### Consistently Sampling By Key
A common use case for sampling is selecting a set of training examples for building a prediction model.
For example, suppose that we have a recommendation system where we have tracked when items have been
impressed to users and when they have clicked on them:
```pig
impressions =
LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long);
clicks =
LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long);
```
Using this data we would like to build a model that can predict user behavior so that we can show items
to users that they are more likely to click on. Since the data may be very large, we need to take a
sample that is easier to work with. We basically want to join on `(user_id,item_id)`, sample the result
and product training data with the following format:
```pig
{(user_id:int, item_id:int, is_impressed:int, is_clicked:int}
```
The problem with this approach though is that the join can be very expensive if the data size is large.
Sampling reduces the data size, but it has to be applied after the join because the same `(user_id,item_id)`
pairs won't be selected from `impressions` and `clicks`.
[SampleByKey](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SampleByKey.html)
solves this problem by allowing us to sample consistently across multiple relations. By this we mean
that if a "key" appears in the sample output for one relation, then the same key will appear in every
other relation as well. This guarantees that we can apply the join after sampling.
This is essentially Bernoulli sampling. But the "random number" in this case is derived by applying a cryptographic
hash to the key, rather than by invoking a pseudo-random number generator.
Let's see how we can apply this to our example above. We want to take a 10% sample of the joined clicks
and impressions. We start by defining the UDF:
```pig
DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.1');
```
Since we are going to be joining on `(user_id,item_id)`, we need sample using this pair:
```pig
impressions = FILTER impressions BY SampleByKey(user_id,item_id);
clicks = FILTER clicks BY SampleByKey(user_id,item_id);
```
We can now join the impressions and clicks, with the knowledge that the same `(user_id,item_id)`
pairs will appear in both samples.
```pig
joined_sample = FOREACH (COGROUP impressions BY (user_id,item_id),
clicks BY (user_id,item_id)) GENERATE
group.user_id as user_id,
group.item_id as item_id,
((SIZE(impressions) > 0 ? 1 : 0)) as is_impressed,
((SIZE(clicks) > 0 ? 1 : 0)) as is_clicked;
```
Since we have sampled before joining the data, this should be much more efficient.