blob: 94b4caf3bd14451c06349707cdcc7c597a72b8e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.pdf;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import technology.tabula.RectangularTextContainer;
import technology.tabula.Table;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
public class PdfBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
private static final String NEW_FIELD_PREFIX = "field_";
private final List<PdfColumnWriter> writers;
private final PdfReaderConfig config;
private final int startingTableIndex;
private final FileDescrip file;
private PdfMetadataReader metadataReader;
private CustomErrorContext errorContext;
private RowSetLoader rowWriter;
private PDDocument document;
private SchemaBuilder builder;
private List<String> columnHeaders;
private Table currentTable;
private int currentTableIndex;
private List<String> firstRow;
private PdfRowIterator rowIterator;
private final FileSchemaNegotiator negotiator;
private int unregisteredColumnCount;
// Tables
private List<Table> tables;
static class PdfReaderConfig {
final PdfFormatPlugin plugin;
PdfReaderConfig(PdfFormatPlugin plugin) {
this.plugin = plugin;
}
}
public PdfBatchReader(PdfReaderConfig readerConfig, FileSchemaNegotiator negotiator) {
this.unregisteredColumnCount = 0;
this.writers = new ArrayList<>();
this.config = readerConfig;
this.startingTableIndex = readerConfig.plugin.getConfig().defaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().defaultTableIndex();
this.currentTableIndex = this.startingTableIndex;
this.columnHeaders = new ArrayList<>();
this.negotiator = negotiator;
this.file = negotiator.file();
errorContext = negotiator.parentErrorContext();
builder = new SchemaBuilder();
openFile();
metadataReader = new PdfMetadataReader(document);
// Get the tables if the user set the combine pages to true
if (config.plugin.getConfig().combinePages() ) {
tables = PdfUtils.extractTablesFromPDF(document, config.plugin.getConfig().getAlgorithm());
currentTable = tables.get(0);
} else {
currentTable = PdfUtils.getSpecificTable(document, startingTableIndex, config.plugin.getConfig().getAlgorithm());
tables = Collections.singletonList(currentTable);
// If the user specifies a table index, and that table does not exist, throw an exception.
if (currentTable == null && startingTableIndex != 0) {
throw UserException.dataReadError()
.message("The specified table index " + startingTableIndex + " does not exist in this file. ")
.addContext(errorContext)
.build(logger);
}
}
// Get the row iterator and grab the first row to build the schema
rowIterator = new PdfRowIterator(currentTable);
if (rowIterator.hasNext()) {
firstRow = PdfUtils.convertRowToStringArray(rowIterator.next());
}
// Support provided schema
TupleMetadata schema = null;
if (negotiator.providedSchema() != null) {
schema = negotiator.providedSchema();
negotiator.tableSchema(schema, false);
} else {
negotiator.tableSchema(buildSchema(), false);
}
ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
metadataReader.setRowWriter(rowWriter);
// Build the schema
if (negotiator.providedSchema() != null) {
buildWriterListFromProvidedSchema(schema);
} else {
buildWriterList();
}
metadataReader.addImplicitColumnsToSchema();
}
@Override
public boolean next() {
while(!rowWriter.isFull()) {
if (config.plugin.getConfig().combinePages() &&
(!rowIterator.hasNext()) &&
currentTableIndex < (tables.size() - 1)) {
// Case for end of current page but more tables exist and combinePages is set to true.
// Get the next table
currentTableIndex++;
currentTable = tables.get(currentTableIndex);
// Update the row iterator
rowIterator = new PdfRowIterator(currentTable);
// Skip the first row in the new table because it most likely contains headers.
if (config.plugin.getConfig().extractHeaders()) {
rowIterator.next();
}
} else if (! rowIterator.hasNext()) {
// Special case for document with no tables
if (currentTable == null) {
rowWriter.start();
metadataReader.writeMetadata();
rowWriter.save();
}
return false;
}
// Process the row
processRow(rowIterator.next());
}
return true;
}
private void processRow(List<RectangularTextContainer> row) {
if (row == null || row.size() == 0) {
rowWriter.start();
metadataReader.writeMetadata();
rowWriter.save();
return;
}
String value;
rowWriter.start();
int rowPosition = 0;
for (RectangularTextContainer cellValue : row) {
value = cellValue.getText();
if (!Strings.isNullOrEmpty(value)) {
writers.get(rowPosition).load(row.get(rowPosition));
}
rowPosition++;
}
metadataReader.writeMetadata();
rowWriter.save();
}
@Override
public void close() {
if (document != null) {
AutoCloseables.closeSilently(document.getDocument());
AutoCloseables.closeSilently(document);
document = null;
}
}
/**
* This method opens the PDF file and finds the tables
*/
private void openFile() {
try {
InputStream fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(file.split().getPath());
if (Strings.isNullOrEmpty(config.plugin.getConfig().password())) {
document = PDDocument.load(fsStream);
} else {
// Case for encrypted files
document = PDDocument.load(fsStream, config.plugin.getConfig().password());
}
AutoCloseables.closeSilently(fsStream);
} catch (Exception e) {
throw UserException
.dataReadError(e)
.addContext("Failed to open open input file: %s", file.split().getPath().toString())
.addContext(errorContext)
.build(logger);
}
}
private TupleMetadata buildSchema() {
// Get column header names
columnHeaders = firstRow;
// Case for file with no tables
if (columnHeaders == null) {
return builder.buildSchema();
}
// Add columns to table
int index = 0;
for (String columnName : firstRow) {
if (Strings.isNullOrEmpty(columnName) || !config.plugin.getConfig().extractHeaders()) {
columnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
columnHeaders.set(index, columnName);
unregisteredColumnCount++;
}
builder.addNullable(columnName, MinorType.VARCHAR);
index++;
}
return builder.buildSchema();
}
private void buildWriterList() {
// Case for file with no tables.
if (columnHeaders == null) {
return;
}
for (String header : columnHeaders) {
writers.add(new StringPdfColumnWriter(columnHeaders.indexOf(header), header, rowWriter));
}
}
private void buildWriterListFromProvidedSchema(TupleMetadata schema) {
if (schema == null) {
buildWriterList();
return;
}
int counter = 0;
for (MaterializedField field: schema.toFieldList()) {
String fieldName = field.getName();
MinorType type = field.getType().getMinorType();
columnHeaders.add(fieldName);
switch (type) {
case VARCHAR:
writers.add(new StringPdfColumnWriter(counter, fieldName, rowWriter));
break;
case SMALLINT:
case TINYINT:
case INT:
writers.add(new IntPdfColumnWriter(counter, fieldName, rowWriter));
break;
case BIGINT:
writers.add(new BigIntPdfColumnWriter(counter, fieldName, rowWriter));
break;
case FLOAT4:
case FLOAT8:
writers.add(new DoublePdfColumnWriter(counter, fieldName, rowWriter));
break;
case DATE:
writers.add(new DatePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
break;
case TIME:
writers.add(new TimePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
break;
case TIMESTAMP:
writers.add(new TimestampPdfColumnWriter(counter, fieldName, rowWriter, negotiator));
break;
default:
throw UserException.unsupportedError()
.message("PDF Reader with provided schema does not support " + type.name() + " data type.")
.addContext(errorContext)
.build(logger);
}
}
}
public abstract static class PdfColumnWriter {
final String columnName;
final ScalarWriter writer;
final int columnIndex;
public PdfColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
this.columnIndex = columnIndex;
this.columnName = columnName;
this.writer = writer;
}
public abstract void load (RectangularTextContainer<?> cell);
public abstract void loadFromValue(Object value);
}
public static class IntPdfColumnWriter extends PdfColumnWriter {
IntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(RectangularTextContainer<?> cell) {
writer.setInt(Integer.parseInt(cell.getText()));
}
@Override
public void loadFromValue(Object value) {
writer.setInt((Integer) value);
}
}
public static class BigIntPdfColumnWriter extends PdfColumnWriter {
BigIntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(RectangularTextContainer<?> cell) {
writer.setLong(Long.parseLong(cell.getText()));
}
@Override
public void loadFromValue(Object value) {
writer.setLong((Long) value);
}
}
public static class DoublePdfColumnWriter extends PdfColumnWriter {
DoublePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(RectangularTextContainer<?> cell) {
writer.setDouble(Double.parseDouble(cell.getText()));
}
@Override
public void loadFromValue(Object value) {
writer.setDouble((Double) value);
}
}
public static class StringPdfColumnWriter extends PdfColumnWriter {
StringPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
@Override
public void load(RectangularTextContainer<?> cell) {
writer.setString(cell.getText());
}
@Override
public void loadFromValue(Object value) {
if (! Strings.isNullOrEmpty((String) value)) {
writer.setString((String) value);
}
}
}
public static class DatePdfColumnWriter extends PdfColumnWriter {
private String dateFormat;
DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
if (metadata != null) {
this.dateFormat = metadata.property("drill.format");
}
}
@Override
public void load(RectangularTextContainer<?> cell) {
LocalDate localDate;
if (Strings.isNullOrEmpty(this.dateFormat)) {
localDate = LocalDate.parse(cell.getText());
} else {
localDate = LocalDate.parse(cell.getText(), DateTimeFormatter.ofPattern(dateFormat));
}
writer.setDate(localDate);
}
@Override
public void loadFromValue(Object value) {
if (value != null) {
writer.setDate(LocalDate.parse((String) value));
}
}
}
public static class TimePdfColumnWriter extends PdfColumnWriter {
private String dateFormat;
TimePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
if (metadata != null) {
this.dateFormat = metadata.property("drill.format");
}
}
@Override
public void load(RectangularTextContainer<?> cell) {
LocalTime localTime;
if (Strings.isNullOrEmpty(this.dateFormat)) {
localTime = LocalTime.parse(cell.getText());
} else {
localTime = LocalTime.parse(cell.getText(), DateTimeFormatter.ofPattern(dateFormat));
}
writer.setTime(localTime);
}
@Override
public void loadFromValue(Object value) {
if (value != null) {
writer.setTime(LocalTime.parse((String) value));
}
}
}
public static class TimestampPdfColumnWriter extends PdfColumnWriter {
private String dateFormat;
TimestampPdfColumnWriter(int columnIndex, String columnName, RowSetLoader rowWriter) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
}
TimestampPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileSchemaNegotiator negotiator) {
super(columnIndex, columnName, rowWriter.scalar(columnName));
ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
if (metadata != null) {
this.dateFormat = metadata.property("drill.format");
}
}
@Override
public void load(RectangularTextContainer<?> cell) {
Instant timestamp = null;
if (Strings.isNullOrEmpty(this.dateFormat)) {
timestamp = Instant.parse(cell.getText());
} else {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
Date parsedDate = simpleDateFormat.parse(cell.getText());
timestamp = Instant.ofEpochMilli(parsedDate.getTime());
} catch (ParseException e) {
logger.error("Error parsing timestamp: " + e.getMessage());
}
}
writer.setTimestamp(timestamp);
}
@Override
public void loadFromValue(Object value) {
if (value != null) {
GregorianCalendar calendar = (GregorianCalendar) value;
writer.setTimestamp(calendar.getTime().toInstant());
}
}
}
}