| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage.avro; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.tajo.catalog.Column; |
| import org.apache.tajo.catalog.TableMeta; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.storage.FileAppender; |
| import org.apache.tajo.storage.TableStatistics; |
| import org.apache.tajo.storage.Tuple; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| |
| /** |
| * FileAppender for writing to Avro files. |
| */ |
| public class AvroAppender extends FileAppender { |
| private TableStatistics stats; |
| private Schema avroSchema; |
| private List<Schema.Field> avroFields; |
| private DataFileWriter<GenericRecord> dataFileWriter; |
| |
| /** |
| * Creates a new AvroAppender. |
| * |
| * @param conf Configuration properties. |
| * @param schema The table schema. |
| * @param meta The table metadata. |
| * @param path The path of the Parquet file to write to. |
| */ |
| public AvroAppender(Configuration conf, |
| org.apache.tajo.catalog.Schema schema, |
| TableMeta meta, Path path) throws IOException { |
| super(conf, schema, meta, path); |
| } |
| |
| /** |
| * Initializes the Appender. |
| */ |
| public void init() throws IOException { |
| FileSystem fs = path.getFileSystem(conf); |
| if (!fs.exists(path.getParent())) { |
| throw new FileNotFoundException(path.toString()); |
| } |
| FSDataOutputStream outputStream = fs.create(path); |
| |
| avroSchema = AvroUtil.getAvroSchema(meta, conf); |
| avroFields = avroSchema.getFields(); |
| |
| DatumWriter<GenericRecord> datumWriter = |
| new GenericDatumWriter<GenericRecord>(avroSchema); |
| dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); |
| dataFileWriter.create(avroSchema, outputStream); |
| |
| if (enabledStats) { |
| this.stats = new TableStatistics(schema); |
| } |
| super.init(); |
| } |
| |
| /** |
| * Gets the current offset. Tracking offsets is currenly not implemented, so |
| * this method always returns 0. |
| * |
| * @return 0 |
| */ |
| @Override |
| public long getOffset() throws IOException { |
| return 0; |
| } |
| |
| private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { |
| if (tuple.get(i).isNull()) { |
| return null; |
| } |
| switch (avroType) { |
| case NULL: |
| return null; |
| case BOOLEAN: |
| return tuple.getBool(i); |
| case INT: |
| return tuple.getInt4(i); |
| case LONG: |
| return tuple.getInt8(i); |
| case FLOAT: |
| return tuple.getFloat4(i); |
| case DOUBLE: |
| return tuple.getFloat8(i); |
| case BYTES: |
| case FIXED: |
| return ByteBuffer.wrap(tuple.getBytes(i)); |
| case STRING: |
| return tuple.getText(i); |
| default: |
| throw new RuntimeException("Unknown primitive type."); |
| } |
| } |
| |
| /** |
| * Write a Tuple to the Avro file. |
| * |
| * @param tuple The Tuple to write. |
| */ |
| @Override |
| public void addTuple(Tuple tuple) throws IOException { |
| GenericRecord record = new GenericData.Record(avroSchema); |
| for (int i = 0; i < schema.size(); ++i) { |
| Column column = schema.getColumn(i); |
| if (enabledStats) { |
| stats.analyzeField(i, tuple.get(i)); |
| } |
| Object value; |
| Schema.Field avroField = avroFields.get(i); |
| Schema.Type avroType = avroField.schema().getType(); |
| switch (avroType) { |
| case NULL: |
| case BOOLEAN: |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case BYTES: |
| case STRING: |
| case FIXED: |
| value = getPrimitive(tuple, i, avroType); |
| break; |
| case RECORD: |
| throw new RuntimeException("Avro RECORD not supported."); |
| case ENUM: |
| throw new RuntimeException("Avro ENUM not supported."); |
| case MAP: |
| throw new RuntimeException("Avro MAP not supported."); |
| case UNION: |
| List<Schema> schemas = avroField.schema().getTypes(); |
| if (schemas.size() != 2) { |
| throw new RuntimeException("Avro UNION not supported."); |
| } |
| if (schemas.get(0).getType().equals(Schema.Type.NULL)) { |
| value = getPrimitive(tuple, i, schemas.get(1).getType()); |
| } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { |
| value = getPrimitive(tuple, i, schemas.get(0).getType()); |
| } else { |
| throw new RuntimeException("Avro UNION not supported."); |
| } |
| break; |
| default: |
| throw new RuntimeException("Unknown type: " + avroType); |
| } |
| record.put(i, value); |
| } |
| dataFileWriter.append(record); |
| |
| if (enabledStats) { |
| stats.incrementRow(); |
| } |
| } |
| |
| /** |
| * Flushes the current state of the file. |
| */ |
| @Override |
| public void flush() throws IOException { |
| dataFileWriter.flush(); |
| } |
| |
| /** |
| * Closes the Appender. |
| */ |
| @Override |
| public void close() throws IOException { |
| dataFileWriter.close(); |
| } |
| |
| /** |
| * If table statistics is enabled, retrieve the table statistics. |
| * |
| * @return Table statistics if enabled or null otherwise. |
| */ |
| @Override |
| public TableStats getStats() { |
| if (enabledStats) { |
| return stats.getTableStat(); |
| } else { |
| return null; |
| } |
| } |
| } |