blob: e9e7fb4199292666574629df3bc5af4d0f4ff5e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
/*
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Iterator;
import java.util.Random;
*/
import java.util.*;
import java.io.IOException;
import org.junit.Test;
import org.apache.pig.data.*;
import org.apache.pig.impl.eval.*;
import org.apache.pig.impl.util.Spillable;
/**
* This class will exercise the basic Pig data model and members. It tests for proper behavior in
* assigment and comparision, as well as function application.
*
* @author dnm
*/
public class TestDataBag extends junit.framework.TestCase {
private Random rand = new Random();
private class TestMemoryManager {
ArrayList<Spillable> mManagedObjects = new ArrayList<Spillable>();
public void register(Spillable s) {
mManagedObjects.add(s);
}
public void forceSpill() throws IOException {
Iterator<Spillable> i = mManagedObjects.iterator();
while (i.hasNext()) i.next().spill();
}
}
// Need to override the regular bag factory so I can register with my local
// memory manager.
private class LocalBagFactory {
TestMemoryManager mMemMgr;
public LocalBagFactory(TestMemoryManager mgr) {
mMemMgr = mgr;
}
public DataBag newDefaultBag() {
DataBag bag = new DefaultDataBag();
mMemMgr.register(bag);
return bag;
}
public DataBag newSortedBag(EvalSpec sortSpec) {
DataBag bag = new SortedDataBag(sortSpec);
mMemMgr.register(bag);
return bag;
}
public DataBag newDistinctBag() {
DataBag bag = new DistinctDataBag();
mMemMgr.register(bag);
return bag;
}
}
// Test reading and writing default from memory, no spills.
@Test
public void testDefaultInMemory() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDefaultBag();
ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing default from file with one spill
@Test
public void testDefaultSingleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDefaultBag();
ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing default from file with three spills
@Test
public void testDefaultTripleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDefaultBag();
ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(30);
// Write tuples into both
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with some in file, some in memory.
@Test
public void testDefaultInMemInFile() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDefaultBag();
ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with a spill happening in the middle of the read.
@Test
public void testDefaultSpillDuringRead() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDefaultBag();
ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
for (int i = 0; i < 15; i++) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
mgr.forceSpill();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing sorted from memory, no spills.
@Test
public void testSortedInMemory() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing default from file with one spill
@Test
public void testSortedSingleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing default from file with three spills
@Test
public void testSortedTripleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
// Write tuples into both
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with some in file, some in memory.
@Test
public void testSortedInMemInFile() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with a spill happening in the middle of the read.
@Test
public void testSortedSpillDuringRead() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
// Write tuples into both
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
for (int i = 0; i < 15; i++) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
}
mgr.forceSpill();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with first spill happening in the middle of the read.
@Test
public void testSortedFirstSpillDuringRead() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
for (int i = 0; i < 5; i++) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
}
mgr.forceSpill();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing sorted file with so many spills it requires
// premerge.
@Test
public void testSortedPreMerge() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newSortedBag(null);
PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
// Write tuples into both
for (int j = 0; j < 373; j++) {
for (int i = 0; i < 10; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt()));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Tuple t;
while ((t = rightAnswer.poll()) != null) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), t);
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing distinct from memory, no spills.
@Test
public void testDistinctInMemory() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing distinct from file with one spill
@Test
public void testDistinctSingleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing distinct from file with three spills
@Test
public void testDistinctTripleSpill() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
}
assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size());
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with some in file, some in memory.
@Test
public void testDistinctInMemInFile() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading with a spill happening in the middle of the read.
@Test
public void testDistinctSpillDuringRead() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(i));
b.add(t);
rightAnswer.add(t);
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
for (int i = 0; i < 5; i++) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
mgr.forceSpill();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
// Test reading and writing distinct from file with enough spills to
// force a pre-merge
@Test
public void testDistinctPreMerge() throws Exception {
TestMemoryManager mgr = new TestMemoryManager();
LocalBagFactory factory = new LocalBagFactory(mgr);
DataBag b = factory.newDistinctBag();
TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
// Write tuples into both
for (int j = 0; j < 321; j++) {
for (int i = 0; i < 50; i++) {
Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
b.add(t);
rightAnswer.add(t);
}
mgr.forceSpill();
}
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();
Iterator<Tuple> rIter = rightAnswer.iterator();
while (rIter.hasNext()) {
assertTrue("bag ran out of tuples before answer", bIter.hasNext());
assertEquals("tuples should be the same", bIter.next(), rIter.next());
}
assertFalse("right answer ran out of tuples before the bag",
bIter.hasNext());
}
}