blob: e5d73f51316f173f3cf42b5ae6aa584e60f7afed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.storage.Tuple;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tajo implementation of {@link parquet.hadoop.api.WriteSupport} for {@link org.apache.tajo.storage.Tuple}s.
* Users should use {@link ParquetAppender} and not this class directly.
*/
public class TajoWriteSupport extends WriteSupport<Tuple> {
private RecordConsumer recordConsumer;
private MessageType rootSchema;
private Schema rootTajoSchema;
/**
* Creates a new TajoWriteSupport.
*
* @param tajoSchema The Tajo schema for the table.
*/
public TajoWriteSupport(Schema tajoSchema) {
this.rootSchema = new TajoSchemaConverter().convert(tajoSchema);
this.rootTajoSchema = tajoSchema;
}
/**
* Initializes the WriteSupport.
*
* @param configuration The job's configuration.
* @return A WriteContext that describes how to write the file.
*/
@Override
public WriteContext init(Configuration configuration) {
Map<String, String> extraMetaData = new HashMap<>();
return new WriteContext(rootSchema, extraMetaData);
}
/**
* Called once per row group.
*
* @param recordConsumer The {@link parquet.io.api.RecordConsumer} to write to.
*/
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
/**
* Writes a Tuple to the file.
*
* @param tuple The Tuple to write to the file.
*/
@Override
public void write(Tuple tuple) {
recordConsumer.startMessage();
writeRecordFields(rootSchema, rootTajoSchema, tuple);
recordConsumer.endMessage();
}
private void writeRecordFields(GroupType schema, Schema tajoSchema,
Tuple tuple) {
List<Type> fields = schema.getFields();
// Parquet ignores Tajo NULL_TYPE columns, so the index may differ.
int index = 0;
for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) {
Column column = tajoSchema.getColumn(tajoIndex);
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
continue;
}
Type fieldType = fields.get(index);
if (!tuple.isBlankOrNull(tajoIndex)) {
recordConsumer.startField(fieldType.getName(), index);
writeValue(column, tuple, tajoIndex);
recordConsumer.endField(fieldType.getName(), index);
} else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
throw new RuntimeException("Null-value for required field: " +
column.getSimpleName());
}
++index;
}
}
private void writeValue(Column column, Tuple tuple, int index) {
switch (column.getDataType().getType()) {
case BOOLEAN:
recordConsumer.addBoolean(tuple.getBool(index));
break;
case BIT:
case INT2:
case INT4:
recordConsumer.addInteger(tuple.getInt4(index));
break;
case INT8:
recordConsumer.addLong(tuple.getInt8(index));
break;
case FLOAT4:
recordConsumer.addFloat(tuple.getFloat4(index));
break;
case FLOAT8:
recordConsumer.addDouble(tuple.getFloat8(index));
break;
case CHAR:
if (tuple.size(index) > column.getDataType().getLength()) {
throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength());
}
recordConsumer.addBinary(Binary.fromByteArray(tuple.getTextBytes(index)));
break;
case TEXT:
recordConsumer.addBinary(Binary.fromByteArray(tuple.getTextBytes(index)));
break;
case PROTOBUF:
case BLOB:
case INET4:
case INET6:
recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index)));
break;
default:
break;
}
}
}