blob: ef048f22edfc7c98a0198d6848aaad1c8a88603c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import static org.apache.pig.builtin.mock.Storage.bag;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_SCHEMA;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.junit.Test;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupWriter;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext;
import org.apache.parquet.io.ConverterConsumer;
import org.apache.parquet.io.RecordConsumerLoggingWrapper;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestTupleRecordConsumer {
private static final Logger LOG = LoggerFactory.getLogger(TestTupleRecordConsumer.class);
@Test
public void testArtSchema() throws ExecException, ParserException {
String pigSchemaString =
"DocId:long, " +
"Links:(Backward:{(long)}, Forward:{(long)}), " +
"Name:{(Language:{(Code:chararray,Country:chararray)}, Url:chararray)}";
SimpleGroup g = new SimpleGroup(getMessageType(pigSchemaString));
g.add("DocId", 1l);
Group links = g.addGroup("Links");
links.addGroup("Backward").addGroup("bag").add(0, 1l);
links.addGroup("Forward").addGroup("bag").add(0, 1l);
Group name = g.addGroup("Name").addGroup("bag");
name.addGroup("Language").addGroup("bag").append("Code", "en").append("Country", "US");
name.add("Url", "http://foo/bar");
testFromGroups(pigSchemaString, Arrays.<Group>asList(g));
}
@Test
public void testBags() throws ExecException, ParserException {
String pigSchemaString = "a: {(b: chararray)}";
SimpleGroup g = new SimpleGroup(getMessageType(pigSchemaString));
Group addGroup = g.addGroup("a");
addGroup.addGroup("bag").append("b", "foo");
addGroup.addGroup("bag").append("b", "bar");
testFromGroups(pigSchemaString, Arrays.<Group>asList(g));
}
@Test
public void testMaps() throws ExecException, ParserException {
String pigSchemaString = "a: [(b: chararray)]";
SimpleGroup g = new SimpleGroup(getMessageType(pigSchemaString));
Group map = g.addGroup("a");
map.addGroup("map").append("key", "foo").addGroup("value").append("b", "foo");
map.addGroup("map").append("key", "bar").addGroup("value").append("b", "bar");
testFromGroups(pigSchemaString, Arrays.<Group>asList(g));
}
@Test
public void testComplexSchema() throws Exception {
String pigSchemaString = "a:chararray, b:{t:(c:chararray, d:chararray)}";
Tuple t0 = tuple("a"+0, bag(tuple("o", "b"), tuple("o1", "b1")));
Tuple t1 = tuple("a"+1, bag(tuple("o", "b"), tuple("o", "b"), tuple("o", "b"), tuple("o", "b")));
Tuple t2 = tuple("a"+2, bag(tuple("o", "b"), tuple("o", null), tuple(null, "b"), tuple(null, null)));
Tuple t3 = tuple("a"+3, null);
testFromTuple(pigSchemaString, Arrays.asList(t0, t1, t2, t3));
}
@Test
public void testMapSchema() throws Exception {
String pigSchemaString = "a:chararray, b:[(c:chararray, d:chararray)]";
Tuple t0 = tuple("a"+0, new HashMap() {{put("foo", tuple("o", "b"));}});
Tuple t1 = tuple("a"+1, new HashMap() {{put("foo", tuple("o", "b")); put("foo", tuple("o", "b")); put("foo", tuple("o", "b")); put("foo", tuple("o", "b"));}});
Tuple t2 = tuple("a"+2, new HashMap() {{put("foo", tuple("o", "b")); put("foo", tuple("o", null)); put("foo", tuple(null, "b")); put("foo", tuple(null, null));}});
Tuple t3 = tuple("a"+3, null);
testFromTuple(pigSchemaString, Arrays.asList(t0, t1, t2, t3));
}
private void testFromTuple(String pigSchemaString, List<Tuple> input) throws Exception {
List<Tuple> tuples = new ArrayList<Tuple>();
RecordMaterializer<Tuple> recordConsumer = newPigRecordConsumer(pigSchemaString);
TupleWriteSupport tupleWriter = newTupleWriter(pigSchemaString, recordConsumer);
for (Tuple tuple : input) {
LOG.debug("{}", tuple);
tupleWriter.write(tuple);
tuples.add(recordConsumer.getCurrentRecord());
}
assertEquals(input.size(), tuples.size());
for (int i = 0; i < input.size(); i++) {
Tuple in = input.get(i);
Tuple out = tuples.get(i);
assertEquals(in.toString(), out.toString());
}
}
private void testFromGroups(String pigSchemaString, List<Group> input) throws ParserException {
List<Tuple> tuples = new ArrayList<Tuple>();
MessageType schema = getMessageType(pigSchemaString);
RecordMaterializer<Tuple> pigRecordConsumer = newPigRecordConsumer(pigSchemaString);
GroupWriter groupWriter = new GroupWriter(new RecordConsumerLoggingWrapper(new ConverterConsumer(pigRecordConsumer.getRootConverter(), schema)), schema);
for (Group group : input) {
groupWriter.write(group);
final Tuple tuple = pigRecordConsumer.getCurrentRecord();
tuples.add(tuple);
LOG.debug("in: {}\nout:{}", group, tuple);
}
List<Group> groups = new ArrayList<Group>();
GroupRecordConverter recordConsumer = new GroupRecordConverter(schema);
TupleWriteSupport tupleWriter = newTupleWriter(pigSchemaString, recordConsumer);
for (Tuple t : tuples) {
LOG.debug("{}", t);
tupleWriter.write(t);
groups.add(recordConsumer.getCurrentRecord());
}
assertEquals(input.size(), groups.size());
for (int i = 0; i < input.size(); i++) {
Group in = input.get(i);
LOG.debug("{}", in);
Group out = groups.get(i);
assertEquals(in.toString(), out.toString());
}
}
private <T> TupleWriteSupport newTupleWriter(String pigSchemaString, RecordMaterializer<T> recordConsumer) throws ParserException {
TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString);
tupleWriter.init(null);
tupleWriter.prepareForWrite(
new ConverterConsumer(recordConsumer.getRootConverter(), tupleWriter.getParquetSchema())
);
return tupleWriter;
}
private Map<String, String> pigMetaData(String pigSchemaString) {
Map<String, String> map = new HashMap<String, String>();
new PigMetaData(pigSchemaString).addToMetaData(map);
return map;
}
private RecordMaterializer<Tuple> newPigRecordConsumer(String pigSchemaString) throws ParserException {
TupleReadSupport tupleReadSupport = new TupleReadSupport();
final Configuration configuration = new Configuration(false);
MessageType parquetSchema = getMessageType(pigSchemaString);
final Map<String, String> pigMetaData = pigMetaData(pigSchemaString);
Map<String, Set<String>> globalMetaData = new HashMap<String, Set<String>>();
for (Entry<String, String> entry : pigMetaData.entrySet()) {
globalMetaData.put(entry.getKey(), new HashSet<String>(Arrays.asList(entry.getValue())));
}
configuration.set(PARQUET_PIG_SCHEMA, pigSchemaString);
final ReadContext init = tupleReadSupport.init(new InitContext(configuration, globalMetaData, parquetSchema));
return tupleReadSupport.prepareForRead(configuration, pigMetaData, parquetSchema, init);
}
private MessageType getMessageType(String pigSchemaString) throws ParserException {
Schema pigSchema = Utils.getSchemaFromString(pigSchemaString);
return new PigSchemaConverter().convert(pigSchema);
}
}