blob: 80b8fe2e7c433bd59e8a9069737cdebbc22a4182 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.sas;
import com.epam.parso.Column;
import com.epam.parso.ColumnFormat;
import com.epam.parso.SasFileProperties;
import com.epam.parso.SasFileReader;
import com.epam.parso.impl.DateTimeConstants;
import com.epam.parso.impl.SasFileReaderImpl;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class SasBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(SasBatchReader.class);
private final List<SasColumnWriter> writerList;
private final FileDescrip file;
private InputStream fsStream;
private SasFileReader sasFileReader;
private final CustomErrorContext errorContext;
private final RowSetLoader rowWriter;
private Object[] firstRow;
private String compressionMethod;
private String fileLabel;
private String fileType;
private String osName;
private String osType;
private String sasRelease;
private String sessionEncoding;
private String serverType;
private LocalDate dateCreated;
private LocalDate dateModified;
private enum IMPLICIT_STRING_COLUMN {
COMPRESSION_METHOD("_compression_method"),
ENCODING("_encoding"),
FILE_LABEL("_file_label"),
FILE_TYPE("_file_type"),
OS_NAME("_os_name"),
OS_TYPE("_os_type"),
SAS_RELEASE("_sas_release"),
SESSION_ENCODING("_session_encoding");
private final String fieldName;
IMPLICIT_STRING_COLUMN(String fieldName) {
this.fieldName = fieldName;
}
public String getFieldName() {
return fieldName;
}
}
private enum IMPLICIT_DATE_COLUMN {
CREATED_DATE("_date_created"),
MODIFIED_DATE("_date_modified");
private final String fieldName;
IMPLICIT_DATE_COLUMN(String fieldName) {
this.fieldName = fieldName;
}
public String getFieldName() {
return fieldName;
}
}
public SasBatchReader(FileSchemaNegotiator negotiator) {
writerList = new ArrayList<>();
file = negotiator.file();
errorContext = negotiator.parentErrorContext();
openFile();
TupleMetadata schema;
if (negotiator.providedSchema() != null) {
schema = negotiator.providedSchema();
} else {
schema = buildSchema();
}
schema = addImplicitColumnsToSchema(schema);
negotiator.tableSchema(schema, true);
ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
buildWriterList(schema);
}
private void openFile() {
try {
fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
sasFileReader = new SasFileReaderImpl(fsStream);
firstRow = sasFileReader.readNext();
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Unable to open SAS File %s", file.split().getPath())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
private TupleMetadata buildSchema() {
SchemaBuilder builder = new SchemaBuilder();
List<Column> columns = sasFileReader.getColumns();
for (Column column : columns) {
String columnName = column.getName();
String columnType = column.getType().getSimpleName();
ColumnFormat columnFormat = column.getFormat();
try {
MinorType type;
if (DateTimeConstants.TIME_FORMAT_STRINGS.contains(columnFormat.getName())) {
type = MinorType.TIME;
} else if (DateTimeConstants.DATE_FORMAT_STRINGS.containsKey(columnFormat.getName())) {
type = MinorType.DATE;
} else if (DateTimeConstants.DATETIME_FORMAT_STRINGS.containsKey(columnFormat.getName())) {
type = MinorType.TIMESTAMP;
} else {
type = getType(columnType);
}
builder.addNullable(columnName, type);
} catch (Exception e) {
throw UserException.dataReadError()
.message("Error with type of column " + columnName + "; Type: " + columnType)
.addContext(errorContext)
.build(logger);
}
}
return builder.buildSchema();
}
private void buildWriterList(TupleMetadata schema) {
int colIndex = 0;
for (MaterializedField field : schema.toFieldList()) {
String fieldName = field.getName();
MinorType type = field.getType().getMinorType();
if (type == MinorType.FLOAT8) {
writerList.add(new DoubleSasColumnWriter(colIndex, fieldName, rowWriter));
} else if (type == MinorType.DATE) {
writerList.add(new DateSasColumnWriter(colIndex, fieldName, rowWriter));
} else if (type == MinorType.TIME) {
writerList.add(new TimeSasColumnWriter(colIndex, fieldName, rowWriter));
} else if (type == MinorType.VARCHAR) {
writerList.add(new StringSasColumnWriter(colIndex, fieldName, rowWriter));
} else if (type == MinorType.TIMESTAMP) {
writerList.add(new TimestampSasColumnWriter(colIndex, fieldName, rowWriter));
} else {
throw UserException.dataReadError()
.message(fieldName + " is an unparsable data type: " + type.name() + ". The SAS reader does not support this data type.")
.addContext(errorContext)
.build(logger);
}
colIndex++;
}
}
private MinorType getType(String simpleType) {
switch (simpleType) {
case "String":
return MinorType.VARCHAR;
case "Double":
case "Number":
case "Numeric":
case "Long":
return MinorType.FLOAT8;
case "Date":
return MinorType.DATE;
default:
throw UserException.dataReadError()
.message("SAS Reader does not support data type: " + simpleType)
.addContext(errorContext)
.build(logger);
}
}
private TupleMetadata addImplicitColumnsToSchema(TupleMetadata schema) {
SchemaBuilder builder = new SchemaBuilder();
ColumnMetadata colSchema;
builder.addAll(schema);
SasFileProperties fileProperties = sasFileReader.getSasFileProperties();
// Add String Metadata columns
for (IMPLICIT_STRING_COLUMN name : IMPLICIT_STRING_COLUMN.values()) {
colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.VARCHAR, DataMode.OPTIONAL);
colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
builder.add(colSchema);
}
// Add Date Column Names
for (IMPLICIT_DATE_COLUMN name : IMPLICIT_DATE_COLUMN.values()) {
colSchema = MetadataUtils.newScalar(name.getFieldName(), MinorType.DATE, DataMode.OPTIONAL);
colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
builder.add(colSchema);
}
populateMetadata(fileProperties);
return builder.build();
}
@Override
public boolean next() {
while (!rowWriter.isFull()) {
if (!processNextRow()) {
return false;
}
}
return true;
}
@Override
public void close() {
AutoCloseables.closeSilently(fsStream);
}
private boolean processNextRow() {
Object[] row;
try {
// Process first row
if (firstRow != null) {
row = firstRow;
firstRow = null;
} else {
row = sasFileReader.readNext();
}
if (row == null) {
return false;
}
rowWriter.start();
for (int i = 0; i < row.length; i++) {
writerList.get(i).load(row);
}
// Write Metadata
writeMetadata(row.length);
rowWriter.save();
} catch (IOException e) {
throw UserException.dataReadError()
.message("Error reading SAS file: " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
return true;
}
private void populateMetadata(SasFileProperties fileProperties) {
compressionMethod = fileProperties.getCompressionMethod();
fileLabel = fileProperties.getFileLabel();
fileType = fileProperties.getFileType();
osName = fileProperties.getOsName();
osType = fileProperties.getOsType();
sasRelease = fileProperties.getSasRelease();
sessionEncoding = fileProperties.getSessionEncoding();
serverType = fileProperties.getServerType();
dateCreated = convertDateToLocalDate(fileProperties.getDateCreated());
dateModified = convertDateToLocalDate(fileProperties.getDateCreated());
}
private void writeMetadata(int startIndex) {
((StringSasColumnWriter)writerList.get(startIndex)).load(compressionMethod);
((StringSasColumnWriter)writerList.get(startIndex+1)).load(fileLabel);
((StringSasColumnWriter)writerList.get(startIndex+2)).load(fileType);
((StringSasColumnWriter)writerList.get(startIndex+3)).load(osName);
((StringSasColumnWriter)writerList.get(startIndex+4)).load(osType);
((StringSasColumnWriter)writerList.get(startIndex+5)).load(sasRelease);
((StringSasColumnWriter)writerList.get(startIndex+6)).load(sessionEncoding);
((StringSasColumnWriter)writerList.get(startIndex+7)).load(serverType);
((DateSasColumnWriter)writerList.get(startIndex+8)).load(dateCreated);
((DateSasColumnWriter)writerList.get(startIndex+9)).load(dateModified);
}
private static LocalDate convertDateToLocalDate(Date date) {
return Instant.ofEpochMilli(date.toInstant().toEpochMilli())
.atZone(ZoneOffset.ofHours(0))
.toLocalDate();
}
public abstract static class SasColumnWriter {
final String columnName;
final ScalarWriter writer;
final int columnIndex;
public SasColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
this.columnIndex = columnIndex;
this.columnName = columnName;
this.writer = writer;
}
public abstract void load (Object[] row);
}
public static class StringSasColumnWriter extends SasColumnWriter {
StringSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(Object[] row) {
if (row[columnIndex] != null) {
writer.setString(row[columnIndex].toString());
}
}
public void load (String value) {
if (!Strings.isNullOrEmpty(value)) {
writer.setString(value);
}
}
}
public static class DateSasColumnWriter extends SasColumnWriter {
DateSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(Object[] row) {
if (row[columnIndex] != null) {
LocalDate value = convertDateToLocalDate((Date)row[columnIndex]);
writer.setDate(value);
}
}
public void load(LocalDate date) {
writer.setDate(date);
}
}
public static class TimeSasColumnWriter extends SasColumnWriter {
TimeSasColumnWriter(int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(Object[] row) {
int seconds = ((Long) row[columnIndex]).intValue();
LocalTime value = LocalTime.parse(formatSeconds(seconds));
writer.setTime(value);
}
private String formatSeconds(int timeInSeconds)
{
int hours = timeInSeconds / 3600;
int secondsLeft = timeInSeconds - hours * 3600;
int minutes = secondsLeft / 60;
int seconds = secondsLeft - minutes * 60;
StringBuilder formattedTime = new StringBuilder();
if (hours < 10) {
formattedTime.append("0");
}
formattedTime.append(hours).append(":");
if (minutes < 10) {
formattedTime.append("0");
}
formattedTime.append(minutes).append(":");
if (seconds < 10) {
formattedTime.append("0");
}
formattedTime.append(seconds);
return formattedTime.toString();
}
}
public static class DoubleSasColumnWriter extends SasColumnWriter {
DoubleSasColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(Object[] row) {
if (row[columnIndex] != null) {
if (row[columnIndex] instanceof Number) {
writer.setDouble(((Number) row[columnIndex]).doubleValue());
}
}
}
}
public static class TimestampSasColumnWriter extends SasColumnWriter {
TimestampSasColumnWriter(int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(Object[] row) {
if (row[columnIndex] != null) {
writer.setTimestamp(((Date) row[columnIndex]).toInstant());
}
}
}
}