DRILL-7641: Convert Excel Reader to use Streaming Reader
closes #2024
diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml
index ca6b6ab..c8be2fe 100644
--- a/contrib/format-excel/pom.xml
+++ b/contrib/format-excel/pom.xml
@@ -64,6 +64,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.pjfanning</groupId>
+ <artifactId>excel-streaming-reader</artifactId>
+ <version>2.3.2</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 32c062d..88d124b 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.excel;
+import com.github.pjfanning.xlsx.StreamingReader;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -28,19 +29,15 @@
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellType;
-import org.apache.poi.ss.usermodel.CellValue;
import org.apache.poi.ss.usermodel.DateUtil;
-import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
-import org.apache.poi.xssf.usermodel.XSSFRow;
-import org.apache.poi.xssf.usermodel.XSSFSheet;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -50,6 +47,8 @@
import java.util.Iterator;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
import java.util.TimeZone;
public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
@@ -66,23 +65,25 @@
private static final String MISSING_FIELD_NAME_HEADER = "field_";
+ private static final int ROW_CACHE_SIZE = 100;
+
+ private static final int BUFFER_SIZE = 4096;
+
private final ExcelReaderConfig readerConfig;
- private XSSFSheet sheet;
+ private Sheet sheet;
- private XSSFWorkbook workbook;
+ private Row currentRow;
+
+ private Workbook workbook;
private InputStream fsStream;
- private FormulaEvaluator evaluator;
+ private List<String> excelFieldNames;
- private ArrayList<String> excelFieldNames;
+ private List<ScalarWriter> columnWriters;
- private ArrayList<ScalarWriter> columnWriters;
-
- private ArrayList<CellType> cellTypes;
-
- private ArrayList<CellWriter> cellWriterArray;
+ private List<CellWriter> cellWriterArray;
private Iterator<Row> rowIterator;
@@ -90,14 +91,10 @@
private int totalColumnCount;
- private int lineCount;
-
private boolean firstLine;
private FileSplit split;
- private ResultSetLoader loader;
-
private int recordCount;
static class ExcelReaderConfig {
@@ -134,92 +131,104 @@
@Override
public boolean open(FileSchemaNegotiator negotiator) {
split = negotiator.split();
- loader = negotiator.build();
+ ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
openFile(negotiator);
defineSchema();
return true;
}
+ /**
+ * This method opens the Excel file, initializes the Streaming Excel Reader, and initializes the sheet variable.
+ * @param negotiator The Drill file negotiator object that represents the file system
+ */
private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
try {
fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
- workbook = new XSSFWorkbook(fsStream);
+
+ // Open streaming reader
+ workbook = StreamingReader.builder()
+ .rowCacheSize(ROW_CACHE_SIZE)
+ .bufferSize(BUFFER_SIZE)
+ .open(fsStream);
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", split.getPath().toString())
- .message(e.getMessage())
+ .addContext(e.getMessage())
.build(logger);
}
-
- // Evaluate formulae
- evaluator = workbook.getCreationHelper().createFormulaEvaluator();
-
- workbook.setMissingCellPolicy(Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
sheet = getSheet();
}
/**
* This function defines the schema from the header row.
- * @return TupleMedata of the discovered schema
*/
- private TupleMetadata defineSchema() {
+ private void defineSchema() {
SchemaBuilder builder = new SchemaBuilder();
- return getColumnHeaders(builder);
+ getColumnHeaders(builder);
}
- private TupleMetadata getColumnHeaders(SchemaBuilder builder) {
+ private void getColumnHeaders(SchemaBuilder builder) {
//Get the field names
- int columnCount = 0;
+ int columnCount;
- // Case for empty sheet.
- if (sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
- return builder.buildSchema();
+ // Case for empty sheet
+ if (sheet.getLastRowNum() == 0) {
+ builder.buildSchema();
+ return;
}
+ rowIterator = sheet.iterator();
+
// Get the number of columns.
columnCount = getColumnCount();
- excelFieldNames = new ArrayList<>(columnCount);
- cellWriterArray = new ArrayList<>(columnCount);
- rowIterator = sheet.iterator();
+ excelFieldNames = new ArrayList<>();
+ cellWriterArray = new ArrayList<>();
//If there are no headers, create columns names of field_n
if (readerConfig.headerRow == -1) {
String missingFieldName;
- for (int i = 0; i < columnCount; i++) {
+ int i = 0;
+
+ for (Cell c : currentRow) {
missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
excelFieldNames.add(i, missingFieldName);
+ i++;
}
- columnWriters = new ArrayList<>(excelFieldNames.size());
- cellTypes = new ArrayList<>(excelFieldNames.size());
+ columnWriters = new ArrayList<>(columnCount);
- return builder.buildSchema();
+ builder.buildSchema();
} else if (rowIterator.hasNext()) {
- //Find the header row
- int firstHeaderRow = getFirstHeaderRow();
-
- while (lineCount < firstHeaderRow) {
- Row row = rowIterator.next();
- lineCount++;
- }
//Get the header row and column count
- Row row = rowIterator.next();
- totalColumnCount = row.getLastCellNum();
- cellTypes = new ArrayList<>(totalColumnCount);
+ totalColumnCount = currentRow.getLastCellNum();
+ Cell dataCell = null;
//Read the header row
- Iterator<Cell> cellIterator = row.cellIterator();
+ Iterator<Cell> headerRowIterator = currentRow.cellIterator();
int colPosition = 0;
- String tempColumnName = "";
+ String tempColumnName;
- while (cellIterator.hasNext()) {
- Cell cell = cellIterator.next();
+ // Get the first data row.
+ currentRow = rowIterator.next();
+ Row firstDataRow = currentRow;
+ Iterator<Cell> dataRowIterator = firstDataRow.cellIterator();
- CellValue cellValue = evaluator.evaluate(cell);
- switch (cellValue.getCellType()) {
+
+ while (headerRowIterator.hasNext()) {
+ // We need this to get the header names
+ Cell cell = headerRowIterator.next();
+
+ // Since header names are most likely all Strings, we need the first row of actual data to get the data types
+ try {
+ dataCell = dataRowIterator.next();
+ } catch (NoSuchElementException e) {
+ // Do nothing... empty value in data cell
+ }
+
+ switch (dataCell.getCellType()) {
case STRING:
tempColumnName = cell.getStringCellValue()
.replace(PARSER_WILDCARD, SAFE_WILDCARD)
@@ -227,28 +236,26 @@
.replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT);
makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
excelFieldNames.add(colPosition, tempColumnName);
- cellTypes.add(CellType.STRING);
break;
- case NUMERIC:
- tempColumnName = String.valueOf(cell.getNumericCellValue());
+ case FORMULA:
+ case NUMERIC:
+ tempColumnName = cell.getStringCellValue();
makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
excelFieldNames.add(colPosition, tempColumnName);
- cellTypes.add(CellType.NUMERIC);
break;
}
colPosition++;
}
}
- columnWriters = new ArrayList<>(excelFieldNames.size());
- return builder.buildSchema();
+ columnWriters = new ArrayList<>();
+ builder.buildSchema();
}
/**
* Helper function to get the selected sheet from the configuration
- *
- * @return XSSFSheet The selected sheet
+ * @return Sheet The selected sheet
*/
- private XSSFSheet getSheet() {
+ private Sheet getSheet() {
int sheetIndex = 0;
if (!readerConfig.sheetName.isEmpty()) {
sheetIndex = workbook.getSheetIndex(readerConfig.sheetName);
@@ -267,14 +274,21 @@
/**
* Returns the column count. There are a few gotchas here in that we have to know the header row and count the physical number of cells
- * in that row. Since the user can define the header row,
+ * in that row. This function also has to move the rowIterator object to the first row of data.
* @return The number of actual columns
*/
private int getColumnCount() {
+ // Initialize
+ currentRow = rowIterator.next();
int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
- XSSFRow sheetRow = sheet.getRow(rowNumber);
- return sheetRow != null ? sheetRow.getPhysicalNumberOfCells() : 0;
+ // If the headerRow is greater than zero, advance the iterator to the first row of data
+ // This is unfortunately necessary since the streaming reader eliminated the getRow() method.
+ for (int i = 1; i < rowNumber; i++) {
+ currentRow = rowIterator.next();
+ }
+
+ return currentRow.getPhysicalNumberOfCells();
}
@Override
@@ -289,83 +303,78 @@
}
private boolean nextLine(RowSetLoader rowWriter) {
- if( sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
+ if (sheet.getLastRowNum() == 0) {
// Case for empty sheet
return false;
- } else if (!rowIterator.hasNext()) {
- return false;
} else if (recordCount >= readerConfig.lastRow) {
return false;
}
- int lastRow = readerConfig.lastRow;
- if (recordCount < lastRow && rowIterator.hasNext()) {
- lineCount++;
-
- Row row = rowIterator.next();
-
- // If the user specified that there are no headers, get the column count
- if (readerConfig.headerRow == -1 && recordCount == 0) {
- this.totalColumnCount = row.getLastCellNum();
- }
-
- int colPosition = 0;
- if (readerConfig.firstColumn != 0) {
- colPosition = readerConfig.firstColumn - 1;
- }
-
- int finalColumn = totalColumnCount;
- if (readerConfig.lastColumn != 0) {
- finalColumn = readerConfig.lastColumn - 1;
- }
- rowWriter.start();
- for (int colWriterIndex = 0; colPosition < finalColumn; colPosition++) {
- Cell cell = row.getCell(colPosition);
-
- CellValue cellValue = evaluator.evaluate(cell);
-
- populateColumnArray(cell, cellValue, colPosition);
- cellWriterArray.get(colWriterIndex).load(cell);
-
- colWriterIndex++;
- }
-
- if (firstLine) {
- firstLine = false;
- }
- rowWriter.save();
- recordCount++;
- return true;
- } else {
- return false;
+ // If the user specified that there are no headers, get the column count
+ if (readerConfig.headerRow == -1 && recordCount == 0) {
+ totalColumnCount = currentRow.getLastCellNum();
}
+ int colPosition = 0;
+ if (readerConfig.firstColumn != 0) {
+ colPosition = readerConfig.firstColumn - 1;
+ }
+
+ int finalColumn = totalColumnCount;
+ if (readerConfig.lastColumn != 0) {
+ finalColumn = readerConfig.lastColumn - 1;
+ }
+ rowWriter.start();
+ for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) {
+ Cell cell = currentRow.getCell(colPosition);
+
+ populateColumnArray(cell, colPosition);
+ cellWriterArray.get(colWriterIndex).load(cell);
+
+ colPosition++;
+ }
+
+ if (firstLine) {
+ firstLine = false;
+ }
+ rowWriter.save();
+ recordCount++;
+
+ if (!rowIterator.hasNext()) {
+ return false;
+ } else {
+ currentRow = rowIterator.next();
+ return true;
+ }
}
/**
* Function to populate the column array
* @param cell The input cell object
- * @param cellValue The cell value
* @param colPosition The index of the column
*/
- private void populateColumnArray(Cell cell, CellValue cellValue, int colPosition) {
+ private void populateColumnArray(Cell cell, int colPosition) {
if (!firstLine) {
return;
}
- if (cellValue == null) {
+ // Case for empty data cell in first row. In this case, fall back to string.
+ if (cell == null) {
addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+ return;
+ }
+
+ CellType cellType = cell.getCellType();
+ if (cellType == CellType.STRING || readerConfig.allTextMode) {
+ addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+ } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
+ // Case if the column is a date or time
+ addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
+ } else if (cellType == CellType.NUMERIC || cellType == CellType.FORMULA) {
+ // Case if the column is numeric
+ addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8);
} else {
- CellType cellType = cellValue.getCellType();
- if (cellType == CellType.STRING || readerConfig.allTextMode) {
- addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
- } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
- addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
- } else if (cellType == CellType.NUMERIC) {
- addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8);
- } else {
- logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
- }
+ logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
}
}
@@ -411,28 +420,13 @@
}
}
- /**
- * Returns the index of the first row of actual data. This function is to be used to find the header row as the POI skips blank rows.
- * @return The headerRow index, corrected for blank rows
- */
- private int getFirstHeaderRow() {
- int firstRow = sheet.getFirstRowNum();
- int headerRow = readerConfig.headerRow;
-
- if (headerRow < 0) {
- return firstRow;
- } else {
- return headerRow;
- }
- }
-
@Override
public void close() {
if (workbook != null) {
try {
workbook.close();
} catch (IOException e) {
- logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage());
+ logger.warn("Error when closing Excel Workbook resource: {}", e.getMessage());
}
workbook = null;
}
@@ -441,13 +435,13 @@
try {
fsStream.close();
} catch (IOException e) {
- logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage());
+ logger.warn("Error when closing Excel File Stream resource: {}", e.getMessage());
}
fsStream = null;
}
}
- public class CellWriter {
+ public static class CellWriter {
ScalarWriter columnWriter;
CellWriter(ScalarWriter columnWriter) {
@@ -463,11 +457,10 @@
}
public void load(Cell cell) {
- CellValue cellValue = evaluator.evaluate(cell);
- if (cellValue == null) {
+ if (cell == null) {
columnWriter.setNull();
} else {
- String fieldValue = cellValue.getStringValue();
+ String fieldValue = cell.getStringCellValue();
if (fieldValue == null && readerConfig.allTextMode) {
fieldValue = String.valueOf(cell.getNumericCellValue());
}
@@ -476,52 +469,47 @@
}
}
- public class NumericStringWriter extends ExcelBatchReader.CellWriter {
+ public static class NumericStringWriter extends ExcelBatchReader.CellWriter {
NumericStringWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
public void load(Cell cell) {
- String fieldValue = String.valueOf(cell.getNumericCellValue());
-
- if (fieldValue == null) {
+ if (cell == null) {
columnWriter.setNull();
} else {
+ String fieldValue = String.valueOf(cell.getNumericCellValue());
columnWriter.setString(fieldValue);
}
}
}
- public class NumericCellWriter extends ExcelBatchReader.CellWriter {
+ public static class NumericCellWriter extends ExcelBatchReader.CellWriter {
NumericCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
public void load(Cell cell) {
- CellValue cellValue = evaluator.evaluate(cell);
- if (cellValue == null) {
+ if (cell == null) {
columnWriter.setNull();
} else {
- double fieldNumValue = cellValue.getNumberValue();
+ double fieldNumValue = cell.getNumericCellValue();
columnWriter.setDouble(fieldNumValue);
}
}
}
- public class TimestampCellWriter extends ExcelBatchReader.CellWriter {
+ public static class TimestampCellWriter extends ExcelBatchReader.CellWriter {
TimestampCellWriter(ScalarWriter columnWriter) {
super(columnWriter);
}
public void load(Cell cell) {
- CellValue cellValue = evaluator.evaluate(cell);
-
- if (cellValue == null) {
+ if (cell == null) {
columnWriter.setNull();
} else {
- logger.debug("Cell value: {}", cellValue.getNumberValue());
-
- Date dt = DateUtil.getJavaDate(cellValue.getNumberValue(), TimeZone.getTimeZone("UTC"));
+ logger.debug("Cell value: {}", cell.getNumericCellValue());
+ Date dt = DateUtil.getJavaDate(cell.getNumericCellValue(), TimeZone.getTimeZone("UTC"));
Instant timeStamp = new Instant(dt.toInstant().getEpochSecond() * 1000);
columnWriter.setTimestamp(timeStamp);
}
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index fb7df5c..5700b40 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -73,7 +73,8 @@
testBuilder()
.sqlQuery(sql)
- .ordered().baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order")
+ .unOrdered()
+ .baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order")
.baselineValues(1.0, "Cornelia", "Matej", "cmatej0@mtv.com", "Female", "10/31/1974", 735.29, 22.0, 33.42227272727273)
.baselineValues(2.0, "Nydia", "Heintsch", "nheintsch1@godaddy.com", "Female", "12/10/1966", 784.14, 22.0, 35.64272727272727)
.baselineValues(3.0, "Waiter", "Sherel", "wsherel2@utexas.edu", "Male", "3/12/1961", 172.36, 17.0, 10.138823529411766)
@@ -320,7 +321,7 @@
testBuilder()
.sqlQuery(sql)
- .ordered()
+ .unOrdered()
.baselineColumns("col1", "col2", "col3")
.baselineValues(1.0,2.0,null)
.baselineValues(2.0,4.0,null)
@@ -339,11 +340,12 @@
testBuilder()
.sqlQuery(sql)
- .ordered().baselineColumns("col1", "col2")
- .baselineValues("1.0", "Bob")
- .baselineValues("2.0", "Steve")
- .baselineValues("3.0", "Anne")
- .baselineValues("Bob", "3.0")
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues("1", "Bob")
+ .baselineValues("2", "Steve")
+ .baselineValues("3", "Anne")
+ .baselineValues("Bob", "3")
.go();
}