blob: 56e649210f632441be15588b68206786e829deb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.converter.parquet;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.BinaryValue;
import org.apache.parquet.example.data.simple.BooleanValue;
import org.apache.parquet.example.data.simple.DoubleValue;
import org.apache.parquet.example.data.simple.FloatValue;
import org.apache.parquet.example.data.simple.Int96Value;
import org.apache.parquet.example.data.simple.IntegerValue;
import org.apache.parquet.example.data.simple.LongValue;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.example.data.simple.Primitive;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
/**
* Custom Implementation of {@link Group} to support adding {@link Object} of type {@link Primitive} or {@link Group}.
* Also provides methods to add {@link Primitive} and {@link Group} with {@link String} key if index is not known.
* @author tilakpatidar
*/
public class ParquetGroup extends Group {
private final GroupType schema;
//each item represents data of a field, which is indexed by the fieldIndex of the schema
private final List<Object>[] data;
public ParquetGroup(GroupType schema) {
this.schema = schema;
this.data = new List[schema.getFields().size()];
for (int i = 0; i < schema.getFieldCount(); ++i) {
this.data[i] = new ArrayList();
}
}
public String toString() {
return this.toString("");
}
public String toString(String indent) {
StringBuilder result = new StringBuilder();
int i = 0;
for (Type field : this.schema.getFields()) {
String name = field.getName();
List<Object> values = this.data[i];
for (Object value : values) {
result.append(indent).append(name);
if (value == null) {
result.append(": NULL\n");
} else if (value instanceof Group) {
result.append("\n").append(((ParquetGroup) value).toString(indent + " "));
} else {
result.append(": ").append(value.toString()).append("\n");
}
}
i++;
}
return result.toString();
}
@Override
public Group addGroup(int fieldIndex) {
ParquetGroup g = new ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
this.data[fieldIndex].add(g);
return g;
}
public Group getGroup(int fieldIndex, int index) {
return (Group) this.getValue(fieldIndex, index);
}
private Object getValue(int fieldIndex, int index) {
List<Object> list;
try {
list = this.data[fieldIndex];
} catch (IndexOutOfBoundsException var6) {
throw new RuntimeException(
"not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") in group:\n" + this);
}
try {
return list.get(index);
} catch (IndexOutOfBoundsException var5) {
throw new RuntimeException(
"not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") element number " + index
+ " in group:\n" + this);
}
}
public void add(int fieldIndex, Primitive value) {
Type type = this.schema.getType(fieldIndex);
List<Object> list = this.data[fieldIndex];
if (!type.isRepetition(REPEATED) && !list.isEmpty()) {
throw new IllegalStateException(
"field " + fieldIndex + " (" + type.getName() + ") can not have more than one value: " + list);
} else {
list.add(value);
}
}
public int getFieldRepetitionCount(int fieldIndex) {
List<Object> list = this.data[fieldIndex];
return list == null ? 0 : list.size();
}
public String getValueToString(int fieldIndex, int index) {
return String.valueOf(this.getValue(fieldIndex, index));
}
public String getString(int fieldIndex, int index) {
return ((BinaryValue) this.getValue(fieldIndex, index)).getString();
}
public int getInteger(int fieldIndex, int index) {
return ((IntegerValue) this.getValue(fieldIndex, index)).getInteger();
}
@Override
public long getLong(int fieldIndex, int index) {
return ((LongValue) this.getValue(fieldIndex, index)).getLong();
}
@Override
public double getDouble(int fieldIndex, int index) {
return ((DoubleValue) this.getValue(fieldIndex, index)).getDouble();
}
@Override
public float getFloat(int fieldIndex, int index) {
return ((FloatValue) this.getValue(fieldIndex, index)).getFloat();
}
public boolean getBoolean(int fieldIndex, int index) {
return ((BooleanValue) this.getValue(fieldIndex, index)).getBoolean();
}
public Binary getBinary(int fieldIndex, int index) {
return ((BinaryValue) this.getValue(fieldIndex, index)).getBinary();
}
public Binary getInt96(int fieldIndex, int index) {
return ((Int96Value) this.getValue(fieldIndex, index)).getInt96();
}
public void add(int fieldIndex, int value) {
this.add(fieldIndex, new IntegerValue(value));
}
public void add(int fieldIndex, long value) {
this.add(fieldIndex, new LongValue(value));
}
public void add(int fieldIndex, String value) {
this.add(fieldIndex, new BinaryValue(Binary.fromString(value)));
}
public void add(int fieldIndex, NanoTime value) {
this.add(fieldIndex, value.toInt96());
}
public void add(int fieldIndex, boolean value) {
this.add(fieldIndex, new BooleanValue(value));
}
public void add(int fieldIndex, Binary value) {
switch (this.getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
this.add(fieldIndex, new BinaryValue(value));
break;
case INT96:
this.add(fieldIndex, new Int96Value(value));
break;
default:
throw new UnsupportedOperationException(
this.getType().asPrimitiveType().getName() + " not supported for Binary");
}
}
public void add(int fieldIndex, float value) {
this.add(fieldIndex, new FloatValue(value));
}
public void add(int fieldIndex, double value) {
this.add(fieldIndex, new DoubleValue(value));
}
@Override
public void add(int i, Group group) {
this.data[i].add(group);
}
public GroupType getType() {
return this.schema;
}
public void writeValue(int field, int index, RecordConsumer recordConsumer) {
((Primitive) this.getValue(field, index)).writeValue(recordConsumer);
}
/**
* Add any object of {@link PrimitiveType} or {@link Group} type with a String key.
* @param key
* @param object
*/
public void add(String key, Object object) {
int fieldIndex = getIndex(key);
if (object.getClass() == ParquetGroup.class) {
this.addGroup(key, (Group) object);
} else {
this.add(fieldIndex, (Primitive) object);
}
}
private int getIndex(String key) {
return getType().getFieldIndex(key);
}
/**
* Add a {@link Group} given a String key.
* @param key
* @param object
*/
private void addGroup(String key, Group object) {
int fieldIndex = getIndex(key);
this.schema.getType(fieldIndex).asGroupType();
this.data[fieldIndex].add(object);
}
}