| --- |
| title: Sampling - Guide - Apache DataFu Pig |
| version: 1.6.0 |
| section_name: Apache DataFu Pig - Guide |
| license: > |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --- |
| |
| ## Sampling |
| |
| Pig has a built-in `SAMPLE` operator that performs [Bernoulli sampling](http://en.wikipedia.org/wiki/Bernoulli_sampling) |
| on a relation. Apache DataFu Pig provides additional sampling techniques for when Bernoulli sampling is not applicable. |
| |
| ### Simple Random Sampling |
| |
| [Simple Random Sampling](http://en.wikipedia.org/wiki/Simple_random_sampling) produces samples of a specific size, |
| where each item has the same probability of being chosen. DataFu has scalable implementations of this that will |
| generate samples of exactly the right size with very high probability (at least 99.99%). |
| Pig's `SAMPLE`, on the other hand, produces a sample of size *roughly* `p*n`, |
| where `p` is the sampling probability and `n` is the sample size. With `SAMPLE` there are no guarantees on the size of the |
| generated sample. |
| |
| #### Simple Random Sample Without Replacement |
| |
| [SimpleRandomSample](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSample.html) |
| implements scalable simple random sampling. |
| It can be used to generate a sample of size of exactly `ceil(p*n)` without replacement from a population of size `n`, |
| where `p` is the sampling |
| probability. The output will be exactly this size with probability at least 99.99%. Sampling "without replacement" |
| means that no item will appear more than once. |
| |
| To use it simply pass in the sampling probability into the UDF's constructor and then pass in a bag to be sampled. |
| For example, the following will produce a 1% sample: |
| |
| ```pig |
| DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01'); |
| |
| input = LOAD 'input' AS (x:double); |
| sampled = FOREACH (GROUP input ALL) GENERATE FLATTEN(SRS(input)); |
| ``` |
| |
| This UDF can also be used to perform [stratified sampling](http://en.wikipedia.org/wiki/Stratified_sampling). |
| For example, the following takes a 1% stratified sample using a label and a proportional allocation strategy: |
| |
| ```pig |
| DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01'); |
| examples = LOAD 'input' AS (x:double,label:chararray); |
| grouped = GROUP examples BY label; |
| sampled = FOREACH grouped GENERATE FLATTEN(SRS(examples)); |
| ``` |
| |
| #### Simple Random Sample With Replacement |
| |
| [SimpleRandomSampleWithReplacementVote](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.html) and |
| [SimpleRandomSampleWithReplacementElect](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.html) |
| together implements a scalable algorithm for simple random sampling with replacement. |
| These can be used to generate a sample of a specific size, with probability at least 99.99%. |
| |
| To use these UDFs, the user needs to provide the desired sample size and a good lower bound on the population |
| size (or the exact size). For example, to generate a sample of 100,000 without replacement: |
| |
| ```pig |
| DEFINE SRSWR_VOTE |
| datafu.pig.sampling.SimpleRandomSampleWithReplacementVote(); |
| DEFINE SRSWR_ELECT |
| datafu.pig.sampling.SimpleRandomSampleWithReplacementElect(); |
| |
| item = LOAD 'input' AS (x:double); |
| summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count; |
| candidates = FOREACH item GENERATE |
| FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count)); |
| sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE |
| FLATTEN(SRSWR_ELECT(candidates)); |
| ``` |
| |
| Here we pass in the exact size for the lower bound. Because of the way the algorithm works, we can use many reducers |
| to generate the final set of sampled data. This is why we use `PARALLEL 10`. The parallel factor can be increased |
| if necessary to distribute the work more. |
| |
| Sampling with replacement is used heavily in [bootstrapping](http://en.wikipedia.org/wiki/Bootstrapping_%28statistics%29). |
| For example, the following script generates 100 bootstrap samples, computes the mean value for each sample, |
| and then outputs the bootstrap estimates. |
| |
| ```pig |
| summary = FOREACH (GROUP item ALL) GENERATE |
| AVG(item.x) AS mean, COUNT(item) AS count; |
| candidates = FOREACH item GENERATE |
| FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count)); |
| sampled = FOREACH (GROUP candidates BY (position % 100) |
| PARALLEL 10) GENERATE |
| AVG(SRSWR_ELECT(candidates)) AS mean; |
| bootstrap = FOREACH (GROUP sampled ALL) GENERATE |
| summary.mean AS mean, sampled.mean AS bootstrapMeans; |
| ``` |
| |
| ### Weighted Random Sampling |
| |
| A weighted sample is similar to a simple random sample without replacement in that it generates a sample |
| with a specific size. The difference is that the probability of selecting each item can be different. |
| [WeightedSample](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/WeightedSample.html) |
| provides an implementation of this. |
| |
| `WeightedSample` operates on a bag of items, where each item has a weight attached to it. It iteratively selects tems from the |
| bag until it reaches the desired output bag size. Since this samples without replacement, once an item is selected it cannot |
| appear again. The probability of selecting an item is given by that item's weight divided by the sum of all remaining |
| items' weights. |
| |
| For example, suppose |
| that we have a bag of four items: `a`, `b`, `c`, `d`. For this bag, `a` has a weight of 100 and the remaining have a weight |
| of 1. |
| |
| ```pig |
| input = LOAD 'input' AS (A: bag{T: tuple(name:chararray,score:int)}); |
| /* Contains a single bag: |
| {(a,100),(b,1),(c,1),(d,1)} |
| */ |
| ``` |
| |
| We expect a weighted sample of this bag to contain `a` with very high probability. |
| Let's generate a sample of size 3 from this bag. To do this we pass in the bag, with 1 to indicate the weight |
| is at index 1, and the sample size of 3. |
| |
| ```pig |
| define WeightedSample datafu.pig.sampling.WeightedSample() |
| |
| result = FOREACH input GENERATE WeightedSample(A,1,3); |
| ``` |
| |
| This is likely to generate output like this, where `a` tends to be present due to its high weight. |
| |
| ```pig |
| DUMP result; |
| /* |
| ({(a,100),(c,5),(b,1)}) |
| */ |
| ``` |
| |
| Alternatively, if we don't pass in sample size, `WeightedSample` will include all items, with the order being influenced |
| by the item weights. |
| |
| ```pig |
| result = FOREACH input GENERATE WeightedSample(A,1); |
| |
| DUMP result; |
| /* |
| ({(a,100),(c,5),(b,1),(d,1)}) |
| */ |
| ``` |
| |
| One simple technique for generating weights that can be used with `WeightedSample` is to use DataFu's |
| [Enumerate](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/Enumerate.html) UDF, which can be used |
| to append each item's tuple with its index within the bag. |
| |
| Again, suppose we have a bag with values `a`, `b`, `c`, `d`, but this time without weights. |
| |
| ```pig |
| input = LOAD 'input' AS (A: bag{T: tuple(name:chararray)}); |
| /* Contains a single bag: |
| {(a),(b),(c),(d)} |
| */ |
| ``` |
| |
| Using `Enumerate`, we can append the index for each item and then compute a score from it. |
| |
| ```pig |
| define Enumerate datafu.pig.bags.Enumerate(); |
| |
| data = FOREACH data GENERATE Enumerate(A) as A; |
| data = FOREACH data { |
| A = FOREACH A GENERATE v1, 1/(double)(i+1) as score; |
| GENERATE A; |
| } |
| |
| /* Produces: |
| ({(a,1.0),(b,0.5),(c,0.3333333333333333),(d,0.25)}) |
| */ |
| ``` |
| |
| This bag can then be passed into `WeightedSample`. This produces a simple random sample where the items in the |
| beginning of the bag are more likely to be selected. |
| |
| ### Consistently Sampling By Key |
| |
| A common use case for sampling is selecting a set of training examples for building a prediction model. |
| For example, suppose that we have a recommendation system where we have tracked when items have been |
| impressed to users and when they have clicked on them: |
| |
| ```pig |
| impressions = |
| LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long); |
| clicks = |
| LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long); |
| ``` |
| |
| Using this data we would like to build a model that can predict user behavior so that we can show items |
| to users that they are more likely to click on. Since the data may be very large, we need to take a |
| sample that is easier to work with. We basically want to join on `(user_id,item_id)`, sample the result |
| and product training data with the following format: |
| |
| ```pig |
| {(user_id:int, item_id:int, is_impressed:int, is_clicked:int} |
| ``` |
| |
| The problem with this approach though is that the join can be very expensive if the data size is large. |
| Sampling reduces the data size, but it has to be applied after the join because the same `(user_id,item_id)` |
| pairs won't be selected from `impressions` and `clicks`. |
| |
| [SampleByKey](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/sampling/SampleByKey.html) |
| solves this problem by allowing us to sample consistently across multiple relations. By this we mean |
| that if a "key" appears in the sample output for one relation, then the same key will appear in every |
| other relation as well. This guarantees that we can apply the join after sampling. |
| |
| This is essentially Bernoulli sampling. But the "random number" in this case is derived by applying a cryptographic |
| hash to the key, rather than by invoking a pseudo-random number generator. |
| |
| Let's see how we can apply this to our example above. We want to take a 10% sample of the joined clicks |
| and impressions. We start by defining the UDF: |
| |
| ```pig |
| DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.1'); |
| ``` |
| |
| Since we are going to be joining on `(user_id,item_id)`, we need sample using this pair: |
| |
| ```pig |
| impressions = FILTER impressions BY SampleByKey(user_id,item_id); |
| clicks = FILTER clicks BY SampleByKey(user_id,item_id); |
| ``` |
| |
| We can now join the impressions and clicks, with the knowledge that the same `(user_id,item_id)` |
| pairs will appear in both samples. |
| |
| ```pig |
| joined_sample = FOREACH (COGROUP impressions BY (user_id,item_id), |
| clicks BY (user_id,item_id)) GENERATE |
| group.user_id as user_id, |
| group.item_id as item_id, |
| ((SIZE(impressions) > 0 ? 1 : 0)) as is_impressed, |
| ((SIZE(clicks) > 0 ? 1 : 0)) as is_clicked; |
| ``` |
| |
| Since we have sampled before joining the data, this should be much more efficient. |
| |
| ### Manually Sampling By Key |
| |
| All the previous methods are based on random selection. If you wish to create a sample of a given table based on a (manually created) table of ids, you can use the _sample\_by\_keys_ macro. |
| |
| For example, lets assume we have a list of customers are stored on the HDFS as _customers.csv_, and our list for the sample are in _sample.csv_, which only contains customers 2, 4 and 6 from the original _customers.csv_. |
| |
| We can use the following Pig script: |
| |
| ```pig |
| REGISTER datafu-pig-<%= current_page.data.version %>.jar; |
| |
| IMPORT 'datafu/sample_by_keys.pig'; |
| |
| data = LOAD 'customers.csv' USING PigStorage(',') AS (id: int, name: chararray, purchases: int, updated: chararray); |
| |
| customers = LOAD 'sample.csv' AS (cust_id: int); |
| |
| sampled = sample_by_keys(data, customers, id, cust_id); |
| |
| STORE sampled INTO 'sample_out'; |
| ``` |
| |
| The result will be all the records from our original table for customers 2, 4 and 6. |