| --- |
| title: Bag Operations - Guide - Apache DataFu Pig |
| version: 1.6.0 |
| section_name: Apache DataFu Pig - Guide |
| license: > |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --- |
| |
| ## Bag operations |
| |
| Often when working with data in Pig, it makes sense to keep the data grouped by one or more fields, |
| which means you are working with bags. Unfortunately there aren't many convenient ways to work |
| with bags in Pig out of the box. For this reason Apache DataFu provides several UDFs for performing useful |
| operations on bags that come up in practice. |
| |
| ### Counting Items in Bags |
| |
| The [CountEach](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/CountEach.html) UDF |
| can be used to count the number of instances of items within a bag. It produces a new bag of the |
| distinct items with their respective counts appended. |
| |
| Let's take a look at an example where this might be useful. |
| Suppose that we have a recommendation system, and we've tracked what items have been recommended. |
| |
| ```pig |
| items = FOREACH items GENERATE memberId, itemId; |
| ``` |
| |
| Let's say that we want to compute the number of times an item has been shown to each user. |
| Our output will have this schema: |
| |
| ``` |
| {memberId:int, items: {{itemId:long, cnt:long}}} |
| ``` |
| |
| Typically we would have to perform to `GROUP` operations to get this output. First we group |
| by `(memberId,itemId)`, count, and then group a second time. This requires two MapReduce jobs. |
| |
| To make this case more efficient, we can use the `CountEach` UDF. |
| It will produce the same output, but it only requires a single `GROUP` operation: |
| |
| ```pig |
| DEFINE CountEach datafu.pig.bags.CountEach(); |
| |
| items = FOREACH (GROUP items BY memberId) GENERATE |
| group as memberId, |
| CountEach(items.(itemId)) as items; |
| ``` |
| |
| ### Bag Concatenation |
| |
| [BagConcat](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagConcat.html) can be used |
| to concatenate the tuples from two or more bags into a single bag: |
| |
| ```pig |
| define BagConcat datafu.pig.bags.BagConcat(); |
| |
| -- ({(1),(2),(3)},{(4),(5)},{(6),(7)}) |
| input = LOAD 'input' AS (B1: bag{T: tuple(v:INT)}, B2: bag{T: tuple(v:INT)}, B3: bag{T: tuple(v:INT)}); |
| |
| -- ({(1),(2),(3),(4),(5),(6),(7)}) |
| output = FOREACH input GENERATE BagConcat(B1,B2,B3); |
| ``` |
| [BagConcat](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagConcat.html) can also be |
| used to concatenate all tuples present in a bag of bags. |
| |
| ```pig |
| define BagConcat datafu.pig.bags.BagConcat(); |
| -- ({({(1),(2),(3)}),({(3),(4),(5)})}) |
| input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})}); |
| |
| -- ({(1),(2),(3),(3),(4),(5)}) |
| output = FOREACH input GENERATE BagConcat(A); |
| ``` |
| |
| ### Grouping Within a Bag |
| |
| Pig has a `GROUP` operation that can be applied to a relation. It produces a new relation where the input |
| tuples are grouped by a particular key. A bag in the relation contains the grouped tuples for that key. The key |
| is represented by a `group` parameter. |
| |
| [BagGroup](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagGroup.html) mimics the `GROUP` |
| operation from Pig. The difference between them is that `BagGroup` operates within a bag, rather than on a relation. |
| This can be useful when operating on bags that are not very large in size. We can operate on the tuples within |
| this bag without involving `GROUP`, which would result in another MapReduce job. With `BagGroup` we can avoid this. |
| |
| In the following example we take an `input_bag` consisting of key-value pairs `(k,v)` and group the tuples by `k`. |
| This produces a new bag having tuples consisting of `group` and `input_bag`. The `group` corresponds to the grouping |
| key `k`. The `input_bag` is a bag containing the tuples from the original `input_bag` that have the same `k` as `group`. |
| |
| ```pig |
| define BagGroup datafu.pig.bags.BagGroup(); |
| |
| data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)}); |
| -- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)}) |
| |
| -- Group input_bag by k |
| data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped; |
| -- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}} |
| -- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})}) |
| ``` |
| |
| We could also project out the key from the final `input_bag` using a nested `FOREACH` so that the bag only |
| consists of the value `v`: |
| |
| ```pig |
| data3 = FOREACH data2 { |
| -- project only the value |
| projected = FOREACH grouped GENERATE group, input_bag.(v); |
| GENERATE projected as grouped; |
| } |
| |
| -- data3: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}} |
| -- ({(1,{(A),(B)}),(2,{(A),(B),(C)}),(3,{(A)})}) |
| ``` |
| |
| ### Append to Bag |
| |
| [AppendToBag](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/AppendToBag.html) can be |
| used to append a tuple to a bag: |
| |
| ```pig |
| define AppendToBag datafu.pig.bags.AppendToBag(); |
| |
| -- ({(1),(2),(3)},(4)) |
| input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT)); |
| |
| -- ({(1),(2),(3),(4)}) |
| output = FOREACH input GENERATE AppendToBag(B,T); |
| ``` |
| |
| ### Prepend to Bag |
| |
| [PrependToBag](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/PrependToBag.html) can be |
| used to prepend a tuple to a bag: |
| |
| ```pig |
| define PrependToBag datafu.pig.bags.PrependToBag(); |
| |
| -- ({(1),(2),(3)},(4)) |
| input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT)); |
| |
| -- ({(4),(1),(2),(3)}) |
| output = FOREACH input GENERATE PrependToBag(B,T); |
| ``` |
| |
| ### Join Bags |
| |
| Pig has a `JOIN` operator, but unfortunately it only operates on relations. Thus, if you wish to join |
| tuples from two bags, you must first flatten, then join, then re-group. To make this process simpler DataFu |
| provides a [BagLeftOuterJoin](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagLeftOuterJoin.html) |
| UDF. |
| |
| Let's walk through an example where this is useful. Suppose that we are building a recommendation system. |
| This system recommends items to users, and these recommendations may be ignored, accepted, or rejected. |
| When analyzing this system, we have a stream of impression, accept, and reject events: |
| |
| ```pig |
| impressions = LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long); |
| accepts = LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long); |
| rejects = LOAD '$rejects' AS (user_id:int, item_id:int, timestamp:long); |
| ``` |
| |
| What we want to produce from this data is a bag of item counts per member: |
| |
| ``` |
| features: {user_id:int, items:{(item_id:int, impression_count:int, accept_count:int, reject_count:int)}} |
| ``` |
| |
| Using DataFu's |
| [CountEach](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/CountEach.html) |
| we can efficiently produce the counts per item for impressions, accepts, and rejects as separate |
| bags per member using a single MapReduce job: |
| |
| ```pig |
| define CountEach datafu.pig.bags.CountEach(); |
| |
| features_counted = FOREACH (COGROUP impressions BY user_id, |
| accepts BY user_id, |
| rejects BY user_id) GENERATE |
| group as user_id, |
| CountEach(impressions.item_id) as impressions, |
| CountEach(accepts.item_id) as accepts, |
| CountEach(rejects.item_id) as rejects; |
| ``` |
| |
| This produces three bags, consisting of `(item_id,count)`. We can now join these bags |
| together using `BagLeftOuterJoin`: |
| |
| ```pig |
| define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin(); |
| |
| features_joined = FOREACH features_counted GENERATE |
| user_id, |
| BagLeftOuterJoin( |
| impressions, 'item_id', |
| accepts, 'item_id', |
| rejects, 'item_id' |
| ) as items; |
| ``` |
| |
| We left join in the impression here since the user cannot accept or reject an item that was not seen. |
| The left join can of course produce null values for accepts and rejects that did not occur, so let's |
| clean those up by replacing null values with counts of zero: |
| |
| ```pig |
| define Coalesce datafu.pig.util.Coalesce(); |
| |
| features = FOREACH features_joined { |
| projected = FOREACH items GENERATE |
| impressions::item_id as item_id, |
| impressions::count as impression_count, |
| Coalesce(accepts::count, 0) as accept_count, |
| Coalesce(rejects::count, 0) as reject_count; |
| GENERATE user_id, projected as items; |
| } |
| ``` |