blob: 68a7d7d22bd8dd1d0bfc519ca6025e3218feaada [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
public class TupleWriteSupport extends WriteSupport<Tuple> {
private static final TupleFactory TF = TupleFactory.getInstance();
private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
public static TupleWriteSupport fromPigSchema(String pigSchemaString) throws ParserException {
return new TupleWriteSupport(Utils.getSchemaFromString(pigSchemaString));
}
private RecordConsumer recordConsumer;
private MessageType rootSchema;
private Schema rootPigSchema;
/**
* @param pigSchema the pigSchema
*/
public TupleWriteSupport(Schema pigSchema) {
super();
this.rootSchema = pigSchemaConverter.convert(pigSchema);
this.rootPigSchema = pigSchema;
}
@Override
public String getName() {
return "pig";
}
public Schema getPigSchema() {
return rootPigSchema;
}
public MessageType getParquetSchema() {
return rootSchema;
}
@Override
public WriteContext init(Configuration configuration) {
Map<String, String> extraMetaData = new HashMap<String, String>();
new PigMetaData(rootPigSchema).addToMetaData(extraMetaData);
return new WriteContext(rootSchema, extraMetaData);
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
public void write(Tuple t) {
try {
recordConsumer.startMessage();
writeTuple(rootSchema, rootPigSchema, t);
recordConsumer.endMessage();
} catch (ExecException | FrontendException e) {
throw new RuntimeException(e);
}
}
private void writeTuple(GroupType schema, Schema pigSchema, Tuple t) throws ExecException, FrontendException {
List<Type> fields = schema.getFields();
List<FieldSchema> pigFields = pigSchema.getFields();
assert fields.size() == pigFields.size();
for (int i = 0; i < fields.size(); i++) {
if (t.isNull(i)) {
continue;
}
Type fieldType = fields.get(i);
recordConsumer.startField(fieldType.getName(), i);
FieldSchema pigType = pigFields.get(i);
switch (pigType.type) {
case DataType.BAG:
Type bagType = fieldType.asGroupType().getType(0);
FieldSchema pigBagInnerType = pigType.schema.getField(0);
DataBag bag = (DataBag)t.get(i);
recordConsumer.startGroup();
if (bag.size() > 0) {
recordConsumer.startField(bagType.getName(), 0);
for (Tuple tuple : bag) {
if (bagType.isPrimitive()) {
writeValue(bagType, pigBagInnerType, tuple, 0);
} else {
recordConsumer.startGroup();
writeTuple(bagType.asGroupType(), pigBagInnerType.schema, tuple);
recordConsumer.endGroup();
}
}
recordConsumer.endField(bagType.getName(), 0);
}
recordConsumer.endGroup();
break;
case DataType.MAP:
Type mapType = fieldType.asGroupType().getType(0);
FieldSchema pigMapInnerType = pigType.schema.getField(0);
@SuppressWarnings("unchecked") // I know
Map<String, Object> map = (Map<String, Object>)t.get(i);
recordConsumer.startGroup();
if (map.size() > 0) {
recordConsumer.startField(mapType.getName(), 0);
Set<Entry<String, Object>> entrySet = map.entrySet();
for (Entry<String, Object> entry : entrySet) {
recordConsumer.startGroup();
Schema keyValueSchema = new Schema(Arrays.asList(new FieldSchema("key", DataType.CHARARRAY), new FieldSchema("value", pigMapInnerType.schema, pigMapInnerType.type)));
writeTuple(mapType.asGroupType(), keyValueSchema, TF.newTuple(Arrays.asList(entry.getKey(), entry.getValue())));
recordConsumer.endGroup();
}
recordConsumer.endField(mapType.getName(), 0);
}
recordConsumer.endGroup();
break;
default:
writeValue(fieldType, pigType, t, i);
break;
}
recordConsumer.endField(fieldType.getName(), i);
}
}
private void writeValue(Type type, FieldSchema pigType, Tuple t, int i) {
try {
if (type.isPrimitive()) {
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
// TODO: use PrimitiveTuple accessors
case BINARY:
byte[] bytes;
if (pigType.type == DataType.BYTEARRAY) {
bytes = ((DataByteArray)t.get(i)).get();
} else if (pigType.type == DataType.CHARARRAY) {
bytes = ((String)t.get(i)).getBytes("UTF-8");
} else {
throw new UnsupportedOperationException("can not convert from " + DataType.findTypeName(pigType.type) + " to BINARY ");
}
recordConsumer.addBinary(Binary.fromReusedByteArray(bytes));
break;
case BOOLEAN:
recordConsumer.addBoolean((Boolean)t.get(i));
break;
case INT32:
recordConsumer.addInteger(((Number)t.get(i)).intValue());
break;
case INT64:
recordConsumer.addLong(((Number)t.get(i)).longValue());
break;
case DOUBLE:
recordConsumer.addDouble(((Number)t.get(i)).doubleValue());
break;
case FLOAT:
recordConsumer.addFloat(((Number)t.get(i)).floatValue());
break;
default:
throw new UnsupportedOperationException(type.asPrimitiveType().getPrimitiveTypeName().name());
}
} else {
assert pigType.type == DataType.TUPLE;
recordConsumer.startGroup();
writeTuple(type.asGroupType(), pigType.schema, (Tuple)t.get(i));
recordConsumer.endGroup();
}
} catch (Exception e) {
throw new ParquetEncodingException("can not write value at " + i + " in tuple " + t + " from type '" + pigType + "' to type '" + type +"'", e);
}
}
}