blob: 0624d8c3d5444c0215f1a217b900c7284077dc74 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.optimizer;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.Util;
import org.junit.Test;
/**
*
* See: https://issues.apache.org/jira/browse/PIG-3020
*
*/
public class TestSchemaResetter {
@Test
public void testSchemaResetter() throws IOException {
new File("build/test/tmp/").mkdirs();
Util.createLocalInputFile("build/test/tmp/TestSchemaResetter.pig", new String[] {
"A = LOAD 'foo' AS (group:tuple(uid, dst_id));",
"edges_both = FOREACH A GENERATE",
" group.uid AS src_id,",
" group.dst_id AS dst_id;",
"both_counts = GROUP edges_both BY src_id;",
"both_counts = FOREACH both_counts GENERATE",
" group AS src_id, SIZE(edges_both) AS size_both;",
"",
"edges_bq = FOREACH A GENERATE",
" group.uid AS src_id,",
" group.dst_id AS dst_id;",
"bq_counts = GROUP edges_bq BY src_id;",
"bq_counts = FOREACH bq_counts GENERATE",
" group AS src_id, SIZE(edges_bq) AS size_bq;",
"",
"per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;",
"store per_user_set_sizes into 'foo';"
});
assertEquals(0, PigRunner.run(new String[] {"-x", "local", "-c", "build/test/tmp/TestSchemaResetter.pig" } , null).getReturnCode());
}
@Test
public void testSchemaResetterExec() throws Exception {
PigServer pigServer = new PigServer(Util.getLocalTestMode());
Data data = Storage.resetData(pigServer);
data.set("input",
tuple(tuple("1", "2")),
tuple(tuple("2", "3")),
tuple(tuple("2", "4")));
pigServer.registerQuery(
"A = LOAD 'input' USING mock.Storage() AS (group:tuple(uid, dst_id));" +
"edges_both = FOREACH A GENERATE" +
" group.uid AS src_id," +
" group.dst_id AS dst_id;" +
"both_counts = GROUP edges_both BY src_id;" +
"both_counts = FOREACH both_counts GENERATE" +
" group AS src_id, SIZE(edges_both) AS size_both;" +
"edges_bq = FOREACH A GENERATE" +
" group.uid AS src_id," +
" group.dst_id AS dst_id;" +
"bq_counts = GROUP edges_bq BY src_id;" +
"bq_counts = FOREACH bq_counts GENERATE" +
" group AS src_id, SIZE(edges_bq) AS size_bq;" +
"per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;" +
"store per_user_set_sizes into 'output' USING mock.Storage();");
List<Tuple> list = data.get("output");
Collections.sort(list);
assertEquals("list: "+list, 2, list.size());
assertEquals("(1,1,1,1)", list.get(0).toString());
assertEquals("(2,2,2,2)", list.get(1).toString());
}
}