blob: 326e19f38388ffd3f693123b0eec5080406dba13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.excel;
import com.github.pjfanning.xlsx.StreamingReader;
import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.poi.ooxml.POIXMLProperties.CoreProperties;
import org.apache.poi.openxml4j.opc.ZipPackage;
import org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellType;
import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ExcelBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(ExcelBatchReader.class);
private static final String SAFE_WILDCARD = "_$";
private static final String SAFE_SEPARATOR = "_";
private static final String PARSER_WILDCARD = ".*";
private static final String HEADER_NEW_LINE_REPLACEMENT = "__";
private static final String MISSING_FIELD_NAME_HEADER = "field_";
private enum IMPLICIT_STRING_COLUMN {
CATEGORY("_category"),
CONTENT_STATUS("_content_status"),
CONTENT_TYPE("_content_type"),
CREATOR("_creator"),
DESCRIPTION("_description"),
IDENTIFIER("_identifier"),
KEYWORDS("_keywords"),
LAST_MODIFIED_BY_USER("_last_modified_by_user"),
REVISION("_revision"),
SUBJECT("_subject"),
TITLE("_title");
private final String fieldName;
IMPLICIT_STRING_COLUMN(String fieldName) {
this.fieldName = fieldName;
}
public String getFieldName() {
return fieldName;
}
}
private enum IMPLICIT_TIMESTAMP_COLUMN {
/**
* The file created date
*/
CREATED("_created"),
/**
* Date the file was last printed, null if never printed.
*/
LAST_PRINTED("_last_printed"),
/**
* Date of last modification
*/
MODIFIED("_modified");
private final String fieldName;
IMPLICIT_TIMESTAMP_COLUMN(String fieldName) {
this.fieldName = fieldName;
}
public String getFieldName() {
return fieldName;
}
}
private enum IMPLICIT_LIST_COLUMN {
/**
* A list of the available sheets in the file.
*/
SHEETS("_sheets");
private final String fieldName;
IMPLICIT_LIST_COLUMN(String fieldName) {
this.fieldName = fieldName;
}
public String getFieldName() {
return fieldName;
}
}
private static final int ROW_CACHE_SIZE = 100;
private static final int BUFFER_SIZE = 4096;
private final ExcelReaderConfig readerConfig;
private final TreeSet<String> columnNameChecker;
private final RowSetLoader rowWriter;
private final CustomErrorContext errorContext;
private Sheet sheet;
private Row currentRow;
private StreamingWorkbook streamingWorkbook;
private InputStream fsStream;
private List<String> excelFieldNames;
private List<ScalarWriter> columnWriters;
private List<CellWriter> cellWriterArray;
private List<ScalarWriter> metadataColumnWriters;
private ScalarWriter sheetNameWriter;
private Iterator<Row> rowIterator;
private FileDescrip file;
private int totalColumnCount;
private boolean firstLine;
private int recordCount;
private Map<String, String> stringMetadata;
private Map<String, Date> dateMetadata;
private Map<String, List<String>> listMetadata;
static class ExcelReaderConfig {
final ExcelFormatPlugin plugin;
final int headerRow;
final int lastRow;
final int firstColumn;
final int lastColumn;
final boolean allTextMode;
final boolean ignoreErrors;
final String sheetName;
final int maxArraySize;
final int thresholdBytesForTempFiles;
final boolean useTempFilePackageParts;
ExcelReaderConfig(ExcelFormatPlugin plugin) {
this.plugin = plugin;
headerRow = plugin.getConfig().getHeaderRow();
lastRow = plugin.getConfig().getLastRow();
firstColumn = plugin.getConfig().getFirstColumn();
lastColumn = plugin.getConfig().getLastColumn();
allTextMode = plugin.getConfig().getAllTextMode();
ignoreErrors = plugin.getConfig().getIgnoreErrors();
sheetName = plugin.getConfig().getSheetName();
maxArraySize = plugin.getConfig().getMaxArraySize();
thresholdBytesForTempFiles = plugin.getConfig().getThresholdBytesForTempFiles();
useTempFilePackageParts = plugin.getConfig().isUseTempFilePackageParts();
}
}
public ExcelBatchReader(ExcelReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
this.errorContext = negotiator.parentErrorContext();
this.readerConfig = readerConfig;
this.file = negotiator.file();
ResultSetLoader loader = negotiator.build();
this.rowWriter = loader.writer();
this.columnNameChecker = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
this.firstLine = true;
openFile();
if (negotiator.providedSchema() != null) {
TupleMetadata providedSchema = negotiator.providedSchema();
logger.debug("Found a provided schema");
// Add Implicit columns to schema
SchemaBuilder builderForProvidedSchema = new SchemaBuilder();
builderForProvidedSchema.addAll(providedSchema);
TupleMetadata finalSchema = builderForProvidedSchema.build();
buildColumnWritersFromProvidedSchema(finalSchema);
// Add schema to file negotiator
logger.debug("Metadata added to provided schema.");
addMetadataToSchema(builderForProvidedSchema);
// Build column writer array
negotiator.tableSchema(finalSchema, true);
} else {
defineSchema(negotiator);
}
}
/**
* This method opens the Excel file, initializes the Streaming Excel Reader, and initializes the sheet variable.
*/
private void openFile() {
try {
fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
if (readerConfig.maxArraySize >= 0) {
IOUtils.setByteArrayMaxOverride(readerConfig.maxArraySize);
}
if (readerConfig.thresholdBytesForTempFiles >= 0) {
ZipInputStreamZipEntrySource.setThresholdBytesForTempFiles(readerConfig.thresholdBytesForTempFiles);
}
if (readerConfig.useTempFilePackageParts) {
ZipPackage.setUseTempFilePackageParts(readerConfig.useTempFilePackageParts);
}
// Open streaming reader
Workbook workbook = StreamingReader.builder()
.rowCacheSize(ROW_CACHE_SIZE)
.bufferSize(BUFFER_SIZE)
.setReadCoreProperties(true)
.open(fsStream);
streamingWorkbook = (StreamingWorkbook) workbook;
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", file.split().getPath().toString())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
sheet = getSheet();
// Populate Metadata Hashmap
populateMetadata(streamingWorkbook);
}
/**
* This function defines the schema from the header row.
*/
private void defineSchema(FileSchemaNegotiator negotiator) {
SchemaBuilder builder = new SchemaBuilder();
getColumnHeaders(builder);
negotiator.tableSchema(builder.buildSchema(), false);
}
private void buildColumnWritersFromProvidedSchema(TupleMetadata finalSchema) {
if (rowIterator == null) {
rowIterator = sheet.rowIterator();
}
// Case for empty sheet
if (rowIterator == null || !rowIterator.hasNext()) {
return;
}
columnWriters = new ArrayList<>();
metadataColumnWriters = new ArrayList<>();
cellWriterArray = new ArrayList<>();
// Get the number of columns.
// This method also advances the row reader to the location of the first row of data
if (!setFirstDataRow()) {
return;
}
totalColumnCount = finalSchema.size();
firstLine = false;
// Populate column writer array
for(MaterializedField field : finalSchema.toFieldList()) {
addColumnToArray(rowWriter, field.getName(), field.getType().getMinorType(), isMetadataField(field.getName()));
}
addMetadataWriters();
}
private void getColumnHeaders(SchemaBuilder builder) {
if (rowIterator == null) {
rowIterator = sheet.rowIterator();
}
// Case for empty sheet
if (rowIterator == null || !rowIterator.hasNext()) {
builder.buildSchema();
return;
}
columnWriters = new ArrayList<>();
metadataColumnWriters = new ArrayList<>();
// Get the number of columns.
// This method also advances the row reader to the location of the first row of data
if (!setFirstRow()) {
return;
}
excelFieldNames = new ArrayList<>();
cellWriterArray = new ArrayList<>();
//If there are no headers, create columns names of field_n
if (readerConfig.headerRow == -1) {
String missingFieldName;
for (short colNum = 0; colNum < currentRow.getLastCellNum(); colNum++) {
missingFieldName = MISSING_FIELD_NAME_HEADER + (colNum + 1);
makeColumn(builder, missingFieldName, MinorType.VARCHAR);
excelFieldNames.add(colNum, missingFieldName);
}
builder.buildSchema();
} else if (rowIterator.hasNext()) {
// Advance first row to header row, if defined.
if (readerConfig.headerRow > 0) {
skipToRow(readerConfig.headerRow);
}
//Get the header row and column count
totalColumnCount = currentRow.getLastCellNum();
//Read the header row
Row headerRow = currentRow;
String tempColumnName;
// Get the first data row.
currentRow = rowIterator.next();
Row firstDataRow = currentRow;
for (short colPosition = 0; colPosition < totalColumnCount; colPosition++) {
// We need this to get the header names
Cell cell = headerRow.getCell(colPosition, Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
// Since header names are most likely all Strings, we need the first row of actual data to get the data types
Cell dataCell = firstDataRow.getCell(colPosition, Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
switch (dataCell.getCellType()) {
case STRING:
tempColumnName = cell.getStringCellValue()
.replace(PARSER_WILDCARD, SAFE_WILDCARD)
.replaceAll("\\.", SAFE_SEPARATOR)
.replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT);
// Remove leading and trailing whitespace
tempColumnName = tempColumnName.trim();
tempColumnName = deconflictColumnNames(tempColumnName);
makeColumn(builder, tempColumnName, MinorType.VARCHAR);
excelFieldNames.add(colPosition, tempColumnName);
columnNameChecker.add(tempColumnName);
break;
default:
tempColumnName = cell.getStringCellValue();
// Remove leading and trailing whitespace
tempColumnName = tempColumnName.trim();
if (StringUtils.isEmpty(tempColumnName)) {
tempColumnName = MISSING_FIELD_NAME_HEADER + (colPosition + 1);
}
tempColumnName = deconflictColumnNames(tempColumnName);
makeColumn(builder, tempColumnName, MinorType.FLOAT8);
excelFieldNames.add(colPosition, tempColumnName);
columnNameChecker.add(tempColumnName);
break;
}
}
}
addMetadataToSchema(builder);
builder.buildSchema();
}
/**
* This function verifies whether a given column name is already present in the projected schema.
* If so, it appends _n to the column name. N will be incremented for every duplicate column
* @param columnName The original column
* @return The deconflicted column name
*/
private String deconflictColumnNames(String columnName) {
Pattern pattern = Pattern.compile("_(\\d+)$");
Matcher matcher = pattern.matcher(columnName);
while (columnNameChecker.contains(columnName)) {
if (matcher.find()) {
int index = Integer.parseInt(matcher.group(1));
index++;
columnName = matcher.replaceFirst("_" + index);
} else {
columnName = columnName + "_1";
}
}
return columnName;
}
/**
* Helper function to get the selected sheet from the configuration
* @return {@link Sheet} The selected sheet
*/
private Sheet getSheet() {
int sheetIndex = 0;
if (!readerConfig.sheetName.isEmpty()) {
sheetIndex = streamingWorkbook.getSheetIndex(readerConfig.sheetName);
}
//If the sheet name is not valid, throw user exception
if (sheetIndex == -1) {
throw UserException
.validationError()
.message("Could not open sheet " + readerConfig.sheetName)
.addContext(errorContext)
.build(logger);
} else {
return streamingWorkbook.getSheetAt(sheetIndex);
}
}
/**
* There are a few gotchas here in that we have to know the header row and count the physical number of cells
* in that row. This function also has to move the rowIterator object to the first row of data.
*/
private boolean setFirstRow() {
// Initialize
currentRow = rowIterator.next();
int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
// If the headerRow is greater than zero, advance the iterator to the first row of data
// This is unfortunately necessary since the streaming reader eliminated the getRow() method.
return skipToRow(rowNumber);
}
/**
* This function is used to set the iterator to the first row of actual data. When a schema is provided,
* we can safely skip the header row, and start reading the first row of data.
*/
private boolean setFirstDataRow() {
// Initialize
currentRow = rowIterator.next();
int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
// If the headerRow is greater than zero, advance the iterator to the first row of data
// This is unfortunately necessary since the streaming reader eliminated the getRow() method.
return skipToRow(rowNumber + 1);
}
@Override
public boolean next() {
recordCount = 0;
while (!rowWriter.isFull()) {
if (!nextLine(rowWriter)) {
return false;
}
}
return true;
}
private boolean skipToRow(int minRowNum) {
while (currentRow.getRowNum() < minRowNum && rowIterator.hasNext()) {
currentRow = rowIterator.next();
}
return currentRow.getRowNum() >= minRowNum;
}
private boolean nextLine(RowSetLoader rowWriter) {
if (rowIterator == null) {
rowIterator = sheet.rowIterator();
}
if (currentRow == null && rowIterator != null && rowIterator.hasNext()) {
currentRow = rowIterator.next();
}
if (currentRow == null || recordCount >= readerConfig.lastRow) {
return false;
}
// If the user specified that there are no headers, get the column count
if (readerConfig.headerRow == -1 && recordCount == 0) {
totalColumnCount = currentRow.getLastCellNum();
}
int colPosition = 0;
if (readerConfig.firstColumn != 0) {
colPosition = readerConfig.firstColumn - 1;
}
int finalColumn = totalColumnCount;
if (readerConfig.lastColumn != 0) {
finalColumn = readerConfig.lastColumn - 1;
}
rowWriter.start();
for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) {
Cell cell = currentRow.getCell(colPosition);
populateColumnArray(cell, colPosition);
if (colWriterIndex < cellWriterArray.size()) {
CellWriter writer = cellWriterArray.get(colWriterIndex);
try {
writer.load(cell);
} catch (NumberFormatException e) {
if (readerConfig.ignoreErrors) {
logger.warn("Error writing cell: {} {}", cell.getStringCellValue(), e.getMessage());
writer.writeNull();
} else {
throw UserException.dataReadError()
.message("Error writing data: " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
}
colPosition++;
}
if (firstLine) {
// Add metadata to column array
addMetadataWriters();
firstLine = false;
}
// Write the metadata
writeMetadata();
rowWriter.save();
recordCount++;
if (!rowIterator.hasNext()) {
return false;
} else {
currentRow = rowIterator.next();
return true;
}
}
private boolean isMetadataField(String fieldName) {
try {
return (IMPLICIT_STRING_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
IMPLICIT_TIMESTAMP_COLUMN.valueOf(fieldName).getFieldName().length() > 0 ||
IMPLICIT_LIST_COLUMN.valueOf(fieldName).getFieldName().length() > 0);
} catch (IllegalArgumentException e) {
return false;
}
}
private void populateMetadata(StreamingWorkbook streamingWorkbook) {
CoreProperties fileMetadata = streamingWorkbook.getCoreProperties();
stringMetadata = new HashMap<>();
dateMetadata = new HashMap<>();
listMetadata = new HashMap<>();
// Populate String metadata columns
stringMetadata.put(IMPLICIT_STRING_COLUMN.CATEGORY.getFieldName(), fileMetadata.getCategory());
stringMetadata.put(IMPLICIT_STRING_COLUMN.CONTENT_STATUS.getFieldName(), fileMetadata.getContentStatus());
stringMetadata.put(IMPLICIT_STRING_COLUMN.CONTENT_TYPE.getFieldName(), fileMetadata.getContentType());
stringMetadata.put(IMPLICIT_STRING_COLUMN.CREATOR.getFieldName(), fileMetadata.getCreator());
stringMetadata.put(IMPLICIT_STRING_COLUMN.DESCRIPTION.getFieldName(), fileMetadata.getDescription());
stringMetadata.put(IMPLICIT_STRING_COLUMN.IDENTIFIER.getFieldName(), fileMetadata.getIdentifier());
stringMetadata.put(IMPLICIT_STRING_COLUMN.KEYWORDS.getFieldName(), fileMetadata.getKeywords());
stringMetadata.put(IMPLICIT_STRING_COLUMN.LAST_MODIFIED_BY_USER.getFieldName(), fileMetadata.getLastModifiedByUser());
stringMetadata.put(IMPLICIT_STRING_COLUMN.REVISION.getFieldName(), fileMetadata.getRevision());
stringMetadata.put(IMPLICIT_STRING_COLUMN.SUBJECT.getFieldName(), fileMetadata.getSubject());
stringMetadata.put(IMPLICIT_STRING_COLUMN.TITLE.getFieldName(), fileMetadata.getTitle());
// Populate Timestamp columns
dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.CREATED.getFieldName(), fileMetadata.getCreated());
dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.LAST_PRINTED.getFieldName(), fileMetadata.getLastPrinted());
dateMetadata.put(IMPLICIT_TIMESTAMP_COLUMN.MODIFIED.getFieldName(), fileMetadata.getModified());
// Populate List columns
listMetadata.put(IMPLICIT_LIST_COLUMN.SHEETS.getFieldName(), getSheetNames());
}
/**
* Function to populate the column array
* @param cell The input cell object
* @param colPosition The index of the column
*/
private void populateColumnArray(Cell cell, int colPosition) {
if (!firstLine) {
return;
}
// Case for empty data cell in first row. In this case, fall back to string.
if (cell == null) {
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR, false);
return;
}
CellType cellType = cell.getCellType();
if (cellType == CellType.STRING || readerConfig.allTextMode) {
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR, false);
} else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
// Case if the column is a date or time
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP, false);
} else if (cellType == CellType.FORMULA) {
// Cells with formulae can return either strings or numbers.
CellType formulaCellType = cell.getCachedFormulaResultType();
if (formulaCellType == CellType.STRING) {
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR, false);
} else {
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8, false);
}
} else if (cellType == CellType.NUMERIC || cellType == CellType.BLANK || cellType == CellType._NONE) {
// Case if the column is numeric
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8, false);
} else {
logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
}
}
private void addMetadataToSchema(SchemaBuilder builder) {
// Add String Metadata columns
for (IMPLICIT_STRING_COLUMN name : IMPLICIT_STRING_COLUMN.values()) {
makeColumn(builder, name.getFieldName(), MinorType.VARCHAR);
}
// Add Date Column Names
for (IMPLICIT_TIMESTAMP_COLUMN name : IMPLICIT_TIMESTAMP_COLUMN.values()) {
makeColumn(builder, name.getFieldName(), MinorType.TIMESTAMP);
}
// Add List Column Names
for (IMPLICIT_LIST_COLUMN name : IMPLICIT_LIST_COLUMN.values()) {
makeColumn(builder, name.getFieldName(), MinorType.LIST);
}
}
private void makeColumn(SchemaBuilder builder, String name, MinorType type) {
// Verify supported types
switch (type) {
// The Excel Reader only Supports Strings, Floats and Date/Times
case VARCHAR:
case INT:
case FLOAT8:
case DATE:
case TIMESTAMP:
case TIME:
builder.addNullable(name, type);
break;
case LIST:
builder.addArray(name, MinorType.VARCHAR);
break;
default:
throw UserException
.validationError()
.message("Undefined column types")
.addContext("Field name", name)
.addContext("Type", type.toString())
.addContext(errorContext)
.build(logger);
}
}
private void addColumnToArray(TupleWriter rowWriter, String name, MinorType type, boolean isMetadata) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name, type, DataMode.OPTIONAL);
colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, isMetadata);
index = rowWriter.addColumn(colSchema);
} else {
return;
}
if (isMetadata) {
metadataColumnWriters.add(rowWriter.scalar(index));
} else {
columnWriters.add(rowWriter.scalar(index));
if (readerConfig.allTextMode && type == MinorType.FLOAT8) {
cellWriterArray.add(new NumericStringWriter(columnWriters.get(index)));
} else if (readerConfig.allTextMode && type == MinorType.INT) {
cellWriterArray.add(new IntStringWriter(columnWriters.get(index)));
} else if (type == MinorType.VARCHAR) {
cellWriterArray.add(new StringCellWriter(columnWriters.get(index)));
} else if (type == MinorType.FLOAT8 || type == MinorType.FLOAT4) {
cellWriterArray.add(new NumericCellWriter(columnWriters.get(index)));
} else if (type == MinorType.INT) {
cellWriterArray.add(new IntCellWriter(columnWriters.get(index)));
} else if (type == MinorType.TIMESTAMP) {
cellWriterArray.add(new TimestampCellWriter(columnWriters.get(index)));
}
}
}
private void addMetadataWriters() {
for (IMPLICIT_STRING_COLUMN colName : IMPLICIT_STRING_COLUMN.values()) {
addColumnToArray(rowWriter, colName.getFieldName(), MinorType.VARCHAR, true);
}
for (IMPLICIT_TIMESTAMP_COLUMN colName : IMPLICIT_TIMESTAMP_COLUMN.values()) {
addColumnToArray(rowWriter, colName.getFieldName(), MinorType.TIMESTAMP, true);
}
}
private void writeMetadata() {
for (IMPLICIT_STRING_COLUMN column : IMPLICIT_STRING_COLUMN.values()) {
String value = stringMetadata.get(column.getFieldName());
int index = column.ordinal();
if (value == null) {
metadataColumnWriters.get(index).setNull();
} else {
metadataColumnWriters.get(index).setString(value);
}
}
for (IMPLICIT_TIMESTAMP_COLUMN column : IMPLICIT_TIMESTAMP_COLUMN.values()) {
Date timeValue = dateMetadata.get(column.getFieldName());
int index = column.ordinal() + IMPLICIT_STRING_COLUMN.values().length;
if (timeValue == null) {
metadataColumnWriters.get(index).setNull();
} else {
metadataColumnWriters.get(index).setTimestamp(Instant.ofEpochMilli(timeValue.getTime()));
}
}
// Write the sheet names. Since this is the only list field
int listIndex = IMPLICIT_STRING_COLUMN.values().length + IMPLICIT_TIMESTAMP_COLUMN.values().length;
String sheetColumnName = IMPLICIT_LIST_COLUMN.SHEETS.fieldName;
List<String> sheetNames = listMetadata.get(sheetColumnName);
if (sheetNameWriter == null) {
int sheetColumnIndex = rowWriter.tupleSchema().index(IMPLICIT_LIST_COLUMN.SHEETS.getFieldName());
if (sheetColumnIndex == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(sheetColumnName, MinorType.VARCHAR, DataMode.REPEATED);
colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
listIndex = rowWriter.addColumn(colSchema);
}
sheetNameWriter = rowWriter.column(listIndex).array().scalar();
}
for (String sheetName : sheetNames) {
sheetNameWriter.setString(sheetName);
}
}
private List<String> getSheetNames() {
List<String> sheets = new ArrayList<>();
int sheetCount = streamingWorkbook.getNumberOfSheets();
for (int i = 0; i < sheetCount; i++) {
sheets.add(streamingWorkbook.getSheetName(i));
}
return sheets;
}
@Override
public void close() {
if (streamingWorkbook != null) {
try {
streamingWorkbook.close();
} catch (IOException e) {
logger.warn("Error when closing Excel Workbook resource: {}", e.getMessage());
}
streamingWorkbook = null;
}
if (fsStream != null) {
try {
fsStream.close();
} catch (IOException e) {
logger.warn("Error when closing Excel File Stream resource: {}", e.getMessage());
}
fsStream = null;
}
}
public static class CellWriter {
ScalarWriter columnWriter;
CellWriter(ScalarWriter columnWriter) {
this.columnWriter = columnWriter;
}
public void load(Cell cell) {}
public void writeNull() {
columnWriter.setNull();
}
}
public class StringCellWriter extends ExcelBatchReader.CellWriter {
StringCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
String fieldValue = cell.getStringCellValue();
if (fieldValue == null && readerConfig.allTextMode) {
fieldValue = String.valueOf(cell.getNumericCellValue());
}
columnWriter.setString(fieldValue);
}
}
}
public static class NumericStringWriter extends ExcelBatchReader.CellWriter {
NumericStringWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
String fieldValue = String.valueOf(cell.getNumericCellValue());
columnWriter.setString(fieldValue);
}
}
}
public static class NumericCellWriter extends ExcelBatchReader.CellWriter {
NumericCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
double fieldNumValue = cell.getNumericCellValue();
columnWriter.setDouble(fieldNumValue);
}
}
}
public static class IntStringWriter extends ExcelBatchReader.CellWriter {
IntStringWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
String fieldValue = String.valueOf(cell.getNumericCellValue());
columnWriter.setString(fieldValue);
}
}
}
public static class IntCellWriter extends ExcelBatchReader.CellWriter {
IntCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
int fieldNumValue = (int) cell.getNumericCellValue();
columnWriter.setInt(fieldNumValue);
}
}
}
public static class TimestampCellWriter extends ExcelBatchReader.CellWriter {
TimestampCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
@Override
public void load(Cell cell) {
if (cell == null) {
columnWriter.setNull();
} else {
LocalDateTime dt = cell.getLocalDateTimeCellValue();
logger.debug("Cell value: {}", dt);
if (dt == null) {
columnWriter.setNull();
} else {
columnWriter.setTimestamp(dt.toInstant(ZoneOffset.UTC));
}
}
}
}
}