| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.orc.tools.convert; |
| |
| import com.opencsv.CSVReader; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; |
| import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.orc.RecordReader; |
| import org.apache.orc.TypeDescription; |
| import org.threeten.bp.LocalDateTime; |
| import org.threeten.bp.ZoneId; |
| import org.threeten.bp.ZonedDateTime; |
| import org.threeten.bp.format.DateTimeFormatter; |
| import org.threeten.bp.temporal.TemporalAccessor; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.sql.Timestamp; |
| |
| public class CsvReader implements RecordReader { |
| private long rowNumber = 0; |
| private final Converter converter; |
| private final int columns; |
| private final CSVReader reader; |
| private final String nullString; |
| private final FSDataInputStream underlying; |
| private final long totalSize; |
| private final DateTimeFormatter dateTimeFormatter; |
| |
| /** |
| * Create a CSV reader |
| * @param reader the stream to read from |
| * @param input the underlying file that is only used for getting the |
| * position within the file |
| * @param size the number of bytes in the underlying stream |
| * @param schema the schema to read into |
| * @param separatorChar the character between fields |
| * @param quoteChar the quote character |
| * @param escapeChar the escape character |
| * @param headerLines the number of header lines |
| * @param nullString the string that is translated to null |
| * @param timestampFormat the timestamp format string |
| */ |
| public CsvReader(java.io.Reader reader, |
| FSDataInputStream input, |
| long size, |
| TypeDescription schema, |
| char separatorChar, |
| char quoteChar, |
| char escapeChar, |
| int headerLines, |
| String nullString, |
| String timestampFormat) { |
| this.underlying = input; |
| this.reader = new CSVReader(reader, separatorChar, quoteChar, escapeChar, |
| headerLines); |
| this.nullString = nullString; |
| this.totalSize = size; |
| IntWritable nextColumn = new IntWritable(0); |
| this.converter = buildConverter(nextColumn, schema); |
| this.columns = nextColumn.get(); |
| this.dateTimeFormatter = DateTimeFormatter.ofPattern(timestampFormat); |
| } |
| |
| interface Converter { |
| void convert(String[] values, VectorizedRowBatch batch, int row); |
| void convert(String[] values, ColumnVector column, int row); |
| } |
| |
| @Override |
| public boolean nextBatch(VectorizedRowBatch batch) throws IOException { |
| batch.reset(); |
| final int BATCH_SIZE = batch.getMaxSize(); |
| String[] nextLine; |
| // Read the CSV rows and place them into the column vectors. |
| while ((nextLine = reader.readNext()) != null) { |
| rowNumber++; |
| if (nextLine.length != columns && |
| !(nextLine.length == columns + 1 && "".equals(nextLine[columns]))) { |
| throw new IllegalArgumentException("Too many columns on line " + |
| rowNumber + ". Expected " + columns + ", but got " + |
| nextLine.length + "."); |
| } |
| converter.convert(nextLine, batch, batch.size++); |
| if (batch.size == BATCH_SIZE) { |
| break; |
| } |
| } |
| return batch.size != 0; |
| } |
| |
| @Override |
| public long getRowNumber() throws IOException { |
| return rowNumber; |
| } |
| |
| @Override |
| public float getProgress() throws IOException { |
| long pos = underlying.getPos(); |
| return totalSize != 0 && pos < totalSize ? (float) pos / totalSize : 1; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| reader.close(); |
| } |
| |
| @Override |
| public void seekToRow(long rowCount) throws IOException { |
| throw new UnsupportedOperationException("Seeking not supported"); |
| } |
| |
| abstract class ConverterImpl implements Converter { |
| final int offset; |
| |
| ConverterImpl(IntWritable offset) { |
| this.offset = offset.get(); |
| offset.set(this.offset + 1); |
| } |
| |
| @Override |
| public void convert(String[] values, VectorizedRowBatch batch, int row) { |
| convert(values, batch.cols[0], row); |
| } |
| } |
| |
| class BooleanConverter extends ConverterImpl { |
| BooleanConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| if (values[offset].equalsIgnoreCase("true") |
| || values[offset].equalsIgnoreCase("t") |
| || values[offset].equals("1")) { |
| ((LongColumnVector) column).vector[row] = 1; |
| } else { |
| ((LongColumnVector) column).vector[row] = 0; |
| } |
| } |
| } |
| } |
| |
| class LongConverter extends ConverterImpl { |
| LongConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| ((LongColumnVector) column).vector[row] = |
| Long.parseLong(values[offset]); |
| } |
| } |
| } |
| |
| class DoubleConverter extends ConverterImpl { |
| DoubleConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| ((DoubleColumnVector) column).vector[row] = |
| Double.parseDouble(values[offset]); |
| } |
| } |
| } |
| |
| class DecimalConverter extends ConverterImpl { |
| DecimalConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| ((DecimalColumnVector) column).vector[row].set( |
| new HiveDecimalWritable(values[offset])); |
| } |
| } |
| } |
| |
| class BytesConverter extends ConverterImpl { |
| BytesConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| byte[] value = values[offset].getBytes(StandardCharsets.UTF_8); |
| ((BytesColumnVector) column).setRef(row, value, 0, value.length); |
| } |
| } |
| } |
| |
| class TimestampConverter extends ConverterImpl { |
| TimestampConverter(IntWritable offset) { |
| super(offset); |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| if (values[offset] == null || nullString.equals(values[offset])) { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } else { |
| TimestampColumnVector vector = (TimestampColumnVector) column; |
| TemporalAccessor temporalAccessor = |
| dateTimeFormatter.parseBest(values[offset], |
| ZonedDateTime.FROM, LocalDateTime.FROM); |
| if (temporalAccessor instanceof ZonedDateTime) { |
| ZonedDateTime zonedDateTime = ((ZonedDateTime) temporalAccessor); |
| Timestamp timestamp = new Timestamp(zonedDateTime.toEpochSecond() * 1000L); |
| timestamp.setNanos(zonedDateTime.getNano()); |
| vector.set(row, timestamp); |
| } else if (temporalAccessor instanceof LocalDateTime) { |
| ZonedDateTime tz = ((LocalDateTime) temporalAccessor).atZone(ZoneId.systemDefault()); |
| Timestamp timestamp = new Timestamp(tz.toEpochSecond() * 1000L); |
| timestamp.setNanos(tz.getNano()); |
| vector.set(row, timestamp); |
| } else { |
| column.noNulls = false; |
| column.isNull[row] = true; |
| } |
| } |
| } |
| } |
| |
| class StructConverter implements Converter { |
| final Converter[] children; |
| |
| |
| StructConverter(IntWritable offset, TypeDescription schema) { |
| children = new Converter[schema.getChildren().size()]; |
| int c = 0; |
| for(TypeDescription child: schema.getChildren()) { |
| children[c++] = buildConverter(offset, child); |
| } |
| } |
| |
| @Override |
| public void convert(String[] values, VectorizedRowBatch batch, int row) { |
| for(int c=0; c < children.length; ++c) { |
| children[c].convert(values, batch.cols[c], row); |
| } |
| } |
| |
| @Override |
| public void convert(String[] values, ColumnVector column, int row) { |
| StructColumnVector cv = (StructColumnVector) column; |
| for(int c=0; c < children.length; ++c) { |
| children[c].convert(values, cv.fields[c], row); |
| } |
| } |
| } |
| |
| Converter buildConverter(IntWritable startOffset, TypeDescription schema) { |
| switch (schema.getCategory()) { |
| case BOOLEAN: |
| return new BooleanConverter(startOffset); |
| case BYTE: |
| case SHORT: |
| case INT: |
| case LONG: |
| return new LongConverter(startOffset); |
| case FLOAT: |
| case DOUBLE: |
| return new DoubleConverter(startOffset); |
| case DECIMAL: |
| return new DecimalConverter(startOffset); |
| case BINARY: |
| case STRING: |
| case CHAR: |
| case VARCHAR: |
| return new BytesConverter(startOffset); |
| case TIMESTAMP: |
| case TIMESTAMP_INSTANT: |
| return new TimestampConverter(startOffset); |
| case STRUCT: |
| return new StructConverter(startOffset, schema); |
| default: |
| throw new IllegalArgumentException("Unhandled type " + schema); |
| } |
| } |
| } |