blob: 98e7e97bae42e6b3ac6b41fa63dc359c3af089f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
public class TestJsonLoaderStorage {
private static final String schema =
"a:boolean," +
"b:int," +
"c:long," +
"d:float," +
"e:double," +
"f:datetime," +
"g:bytearray," +
"h:chararray," +
"i:biginteger," +
"j:bigdecimal," +
"k:map[chararray]," +
"l:tuple(a:int)," +
"m:bag{a:tuple(a:int)}";
private static final String rawInput =
"true\t" +
"123\t" +
"456\t" +
"7.89\t" +
"0.12\t" +
"2013-01-02T03:04:05.123+03:00\t" +
"abc\t" +
"def\t" +
"123456789\t" +
"1234.6789\t" +
"[a#ghi]\t" +
"(123)\t" +
"{(123),(456),(789)}";
private static final String nullInput =
"\t\t\t\t\t\t\t\t\t\t\t\t";
private static final String json =
"{" +
"\"a\":true," +
"\"b\":123," +
"\"c\":456," +
"\"d\":7.89," +
"\"e\":0.12," +
"\"f\":\"2013-01-02T03:04:05.123+03:00\"," +
"\"g\":\"abc\"," +
"\"h\":\"def\"," +
"\"i\":123456789," +
"\"j\":1234.6789," +
"\"k\":{\"a\":\"ghi\"}," +
"\"l\":{\"a\":123}," +
"\"m\":[{\"a\":123},{\"a\":456},{\"a\":789}]" +
"}";
private static final String arraysJson =
"{" +
"\"s\":[\"abc\",\"def\",\"ghi\"]," +
"\"i\":[23,45,78]," +
"\"f\":[23.1,45.2,78.3]" +
"}";
private static final String nullJson =
"{" +
"\"a\":null," +
"\"b\":null," +
"\"c\":null," +
"\"d\":null," +
"\"e\":null," +
"\"f\":null," +
"\"g\":null," +
"\"h\":null," +
"\"i\":null," +
"\"j\":null," +
"\"k\":null," +
"\"l\":null," +
"\"m\":null" +
"}";
private static final String bigDecimalJson =
"{" +
"\"a\":123.456," +
"\"b\":\"123.456\"" +
"}";
private static final String badJson =
"{" +
"\"a\":\"good\"," +
"\"b\":\"good\"" +
"}\n" +
"{" +
"\"a\":bad," +
"\"b\":\"good\"" +
"}\n" +
"{" +
"\"a\":\"good\"," +
"\"b\":\"good\"" +
"}";
private static final String jsonOutput =
"{\"f1\":\"18\",\"count\":3}";
private PigServer pigServer;
@Before
public void setup() throws Exception {
pigServer = new PigServer(Util.getLocalTestMode());
}
private String getTempOutputPath() throws IOException {
File tempFile = File.createTempFile("json", null);
tempFile.delete();
tempFile.deleteOnExit();
String path = tempFile.getAbsolutePath();
if (Util.WINDOWS) {
path = path.replace('\\', '/');
}
return path;
}
private String createInput(String input) throws IOException {
File tempInputFile = File.createTempFile("input", null);
tempInputFile.deleteOnExit();
FileWriter w = new FileWriter(tempInputFile);
w.write(input);
w.close();
String pathInputFile = tempInputFile.getAbsolutePath();
if (Util.WINDOWS) {
pathInputFile = pathInputFile.replace('\\', '/');
}
return pathInputFile;
}
private Iterator<Tuple> loadJson(String input) throws IOException {
String path = createInput(input);
pigServer.registerQuery("data = load '" + path
+ "' using JsonLoader('" + schema + "');");
return pigServer.openIterator("data");
}
private BufferedReader storeJson(String input) throws Exception {
String pathInputFile = createInput(input);
String pathJsonFile = getTempOutputPath();
pigServer.registerQuery("data = load '" + pathInputFile
+ "' as (" + schema + ");");
pigServer.registerQuery("store data into '" + pathJsonFile
+ "' using JsonStorage();");
Path p = new Path(pathJsonFile);
FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
Reader r = new InputStreamReader(fs.open(Util.getFirstPartFile(p)));
BufferedReader br = new BufferedReader(r);
return br;
}
@SuppressWarnings("rawtypes")
@Test
public void testJsonLoader() throws IOException {
Iterator<Tuple> tuples = loadJson(json);
int count = 0;
while(tuples.hasNext()) {
Tuple tuple = tuples.next();
assertEquals(Boolean.class, tuple.get(0).getClass());
assertEquals(Integer.class, tuple.get(1).getClass());
assertEquals(Long.class, tuple.get(2).getClass());
assertEquals(Float.class, tuple.get(3).getClass());
assertEquals(Double.class, tuple.get(4).getClass());
assertEquals(DateTime.class, tuple.get(5).getClass());
assertEquals("2013-01-02T03:04:05.123+03:00", tuple.get(5).toString());
assertEquals(DataByteArray.class, tuple.get(6).getClass());
assertEquals(String.class, tuple.get(7).getClass());
assertEquals(BigInteger.class, tuple.get(8).getClass());
assertEquals(BigDecimal.class, tuple.get(9).getClass());
assertTrue(tuple.get(10) instanceof Map);
assertEquals(String.class, ((Map)tuple.get(10)).get("a").getClass());
assertTrue(tuple.get(11) instanceof Tuple);
assertEquals(Integer.class, ((Tuple)tuple.get(11)).get(0).getClass());
assertTrue(tuple.get(12) instanceof DataBag);
DataBag bag = (DataBag)tuple.get(12);
assertEquals(3, bag.size());
Iterator<Tuple> bagTuples = bag.iterator();
while(bagTuples.hasNext()) {
assertEquals(Integer.class, bagTuples.next().get(0).getClass());
}
count++;
}
assertEquals(1, count);
}
@SuppressWarnings("rawtypes")
@Test
public void testJsonLoaderBadRow() throws IOException{
String badJsonFile = createInput(badJson);
pigServer.registerQuery("data = load '" + badJsonFile + "' using JsonLoader('a:chararray, b:chararray');");
Iterator<Tuple> tuples = pigServer.openIterator("data");
Tuple t = tuples.next();
assertTrue(t.size()==2);
assertTrue(t.get(0)!=null);
assertTrue(t.get(1)!=null);
assertTrue(tuples.hasNext());
// bad row - skip it, returning a null tuple.
t = tuples.next();
assertTrue(t.size()==2);
assertTrue(t.get(0)==null);
assertTrue(t.get(1)==null);
assertTrue(tuples.hasNext());
t = tuples.next();
assertTrue(t.size()==2);
assertTrue(t.get(0)!=null);
assertTrue(t.get(1)!=null);
assertTrue(!tuples.hasNext());
}
@SuppressWarnings("rawtypes")
@Test
public void testJsonLoaderArrays() throws IOException{
String arraysJsonFile = createInput(arraysJson);
pigServer.registerQuery("data = load '" + arraysJsonFile + "' using JsonLoader('s:bag{a:tuple(a:chararray)}, i:bag{a:tuple(a:int)}, f:bag{a:tuple(a:double)}');");
Iterator<Tuple> tuples = pigServer.openIterator("data");
Tuple t = tuples.next();
assertTrue(t.size()==3);
assertTrue(t.get(0)!=null);
assertTrue(t.get(1)!=null);
assertTrue(t.get(2)!=null);
assertTrue(! tuples.hasNext());
}
@SuppressWarnings("rawtypes")
@Test
public void testJsonLoaderBigDecimalFormats() throws IOException{
String bigDecimalJsonFile = createInput(bigDecimalJson);
pigServer.registerQuery("data = load '" + bigDecimalJsonFile + "' using JsonLoader('a:bigdecimal, b:bigdecimal');");
Iterator<Tuple> tuples = pigServer.openIterator("data");
Tuple t = tuples.next();
assertTrue(t.size()==2);
assertTrue(t.get(0)!=null);
assertTrue(t.get(1)!=null);
assertEquals(t.get(0), t.get(1));
assertTrue(!tuples.hasNext());
}
@Test
public void testJsonLoaderNull() throws IOException {
Iterator<Tuple> tuples = loadJson(nullJson);
int count = 0;
while(tuples.hasNext()) {
Tuple tuple = tuples.next();
assertEquals(null, tuple.get(0));
assertEquals(null, tuple.get(1));
assertEquals(null, tuple.get(2));
assertEquals(null, tuple.get(3));
assertEquals(null, tuple.get(4));
assertEquals(null, tuple.get(5));
assertEquals(null, tuple.get(6));
assertEquals(null, tuple.get(7));
assertEquals(null, tuple.get(8));
assertEquals(null, tuple.get(9));
assertEquals(null, tuple.get(10));
assertEquals(null, tuple.get(11));
assertEquals(null, tuple.get(12));
count++;
}
assertEquals(1, count);
}
@Test
public void testJsonStorage() throws Exception {
BufferedReader br = storeJson(rawInput);
String data = br.readLine();
assertEquals(json, data);
String line = data;
int count = 0;
while (line != null) {
line = br.readLine();
count++;
}
assertEquals(1, count);
br.close();
}
@Test
public void testJsonStorageNull() throws Exception {
BufferedReader br = storeJson(nullInput);
String data = br.readLine();
assertEquals(nullJson, data);
String line = data;
int count = 0;
while (line != null) {
line = br.readLine();
count++;
}
assertEquals(1, count);
br.close();
}
@Test
public void testJsonLoaderStorage() throws Exception {
String pattInputFile = createInput(rawInput);
String pattJsonFile = getTempOutputPath();
String pattJson2File = getTempOutputPath();
pigServer.registerQuery("data = load '" + pattInputFile
+ "' as (" + schema + ");");
pigServer.registerQuery("store data into '" + pattJsonFile
+ "' using JsonStorage();");
pigServer.registerQuery("json = load '" + pattJsonFile
+ "' using JsonLoader('" + schema + "');");
pigServer.registerQuery("store json into '" + pattJson2File
+ "' using JsonStorage();");
Path p = new Path(pattJson2File);
FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
Reader r = new InputStreamReader(fs.open(Util.getFirstPartFile(p)));
BufferedReader br = new BufferedReader(r);
String data = br.readLine();
assertEquals(json, data);
String line = data;
int count = 0;
while (line != null) {
line = br.readLine();
count++;
}
assertEquals(1, count);
br.close();
}
@Test
public void testJsonStorageLimit() throws Exception {
String outPath = getTempOutputPath();
Data data = Storage.resetData(pigServer);
data.set("foo", tuple(1), tuple(2), tuple(3), tuple(4));
pigServer.registerQuery("data = load 'foo' using mock.Storage() as (id:int);");
pigServer.registerQuery("data = order data by id;");
pigServer.registerQuery("data = limit data 2;");
pigServer.registerQuery("store data into '" + outPath + "' using JsonStorage();");
Path p = new Path(outPath);
FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
Reader r = new InputStreamReader(fs.open(Util.getFirstPartFile(p)));
BufferedReader br = new BufferedReader(r);
String line = null;
int count = 0;
while ((line = br.readLine()) != null) {
count++;
assertEquals("{\"id\":" + count + "}", line);
}
assertEquals(2, count);
br.close();
}
@Test
public void testSimpleMapSideStreaming() throws Exception {
File input = Util.createInputFile("tmp", "", new String [] {"1,2,3;4,5,6,7,8",
"1,2,3;4,5,6,7,9",
"1,2,3;4,5,6,7,18"});
File tempJsonFile = File.createTempFile("json", "");
tempJsonFile.delete();
// Pig query to run
pigServer.registerQuery("IP = load '"+ Util.generateURI(input.toString(), pigServer.getPigContext())
+"' using PigStorage (';') as (ID:chararray,DETAILS:chararray);");
pigServer.registerQuery(
"id_details = FOREACH IP GENERATE " +
"FLATTEN" +
"(STRSPLIT" +
"(ID,',',3)) AS (drop, code, transaction) ," +
"FLATTEN" +
"(STRSPLIT" +
"(DETAILS,',',5)) AS (lname, fname, date, price, product);");
pigServer.registerQuery(
"transactions = FOREACH id_details GENERATE $0 .. ;");
pigServer.registerQuery(
"transactionsG = group transactions by code;");
pigServer.registerQuery(
"uniqcnt = foreach transactionsG {"+
"sym = transactions.product ;"+
"dsym = distinct sym ;"+
"generate flatten(dsym.product) as f1, COUNT(dsym) as count ;" +
"};");
pigServer.store("uniqcnt", tempJsonFile.getAbsolutePath(), "JsonStorage");
Path p = new Path(tempJsonFile.getAbsolutePath());
FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
Reader r = new InputStreamReader(fs.open(Util.getFirstPartFile(p)));
BufferedReader br = new BufferedReader(r);
String data = br.readLine();
assertEquals(jsonOutput, data);
String line = data;
int count = 0;
while (line != null) {
line = br.readLine();
count++;
}
assertEquals(3, count);
br.close();
tempJsonFile.deleteOnExit();
}
}