blob: a765db573ac349ccad7276b73923d6dec8599c6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.piggybank.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.Logger;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
/**
* CSV loading and storing with support for multi-line fields,
* and escaping of delimiters and double quotes within fields;
* uses CSV conventions of Excel 2007.
*
* Arguments allow for control over:
*
* Which field delimiter to use (default = ',')
* Whether line breaks are allowed inside of fields (YES_MULTILINE = yes, NO_MULTILINE = no, default = no)
* How line breaks are to be written when storing (UNIX = LF, WINDOWS = CRLF, NOCHANGE = system default, default = system default)
* What to do with header rows (first line of each file):
* On load: READ_INPUT_HEADER = read header rows, SKIP_INPUT_HEADER = do not read header rows, default = read header rows
* On store: WRITE_OUTPUT_HEADER = write a header row, SKIP_OUTPUT_HEADER = do not write a header row, default = do not write a header row
*
* Usage:
*
* STORE x INTO '<destFileName>'
* USING org.apache.pig.piggybank.storage.CSVExcelStorage(
* [DELIMITER[,
* {YES_MULTILINE | NO_MULTILINE}[,
* {UNIX | WINDOWS | NOCHANGE}[,
* {READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER}]]]]
* );
*
* Linebreak settings are only used during store; during load, no conversion is performed.
*
* WARNING: A danger with enabling multiline fields during load is that unbalanced
* double quotes will cause slurping up of input until a balancing double
* quote is found, or until something breaks. If you are not expecting
* newlines within fields it is therefore more robust to use NO_MULTILINE,
* which is the default for that reason.
*
* This is Adreas Paepcke's <paepcke@cs.stanford.edu> CSVExcelStorage with a few modifications.
*/
public class CSVExcelStorage extends PigStorage implements StoreFuncInterface, LoadPushDown {
public static enum Linebreaks { UNIX, WINDOWS, NOCHANGE };
public static enum Multiline { YES, NO };
public static enum Headers { DEFAULT, READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER }
protected final static byte LINEFEED = '\n';
protected final static byte CARRIAGE_RETURN = '\r';
protected final static byte DOUBLE_QUOTE = '"';
protected final static byte RECORD_DEL = LINEFEED;
private static final String FIELD_DELIMITER_DEFAULT_STR = ",";
private static final String MULTILINE_DEFAULT_STR = "NO_MULTILINE";
private static final String EOL_DEFAULT_STR = "NOCHANGE";
private static final String HEADER_DEFAULT_STR = "DEFAULT";
long end = Long.MAX_VALUE;
private byte fieldDelimiter = ',';
private Multiline multilineTreatment = Multiline.NO;
private Linebreaks eolTreatment = Linebreaks.NOCHANGE;
private Headers headerTreatment = Headers.DEFAULT;
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private String udfContextSignature;
private String loadLocation;
private boolean[] mRequiredColumns = null;
private boolean mRequiredColumnsInitialized = false;
final Logger logger = Logger.getLogger(getClass().getName());
@SuppressWarnings("rawtypes")
protected RecordReader in = null;
// For replacing LF with CRLF (Unix --> Windows end-of-line convention):
Pattern loneLFDetectorPattern = Pattern.compile("([^\r])\n", Pattern.DOTALL | Pattern.MULTILINE);
Matcher loneLFDetector = loneLFDetectorPattern.matcher("");
// For removing CR (Windows --> Unix):
Pattern CRLFDetectorPattern = Pattern.compile("\r\n", Pattern.DOTALL | Pattern.MULTILINE);
Matcher CRLFDetector = CRLFDetectorPattern.matcher("");
// Pig Storage with COMMA as delimiter:
TupleFactory tupleMaker = TupleFactory.getInstance();
private boolean getNextInQuotedField;
private int getNextFieldID;
private boolean nextTupleSkipChar;
// For handling headers
private boolean loadingFirstRecord = true;
private boolean storingFirstRecord = true;
private String header = null;
private int splitIndex;
private static final String SCHEMA_SIGNATURE = "pig.csvexcelstorage.schema";
protected ResourceSchema schema = null;
/*-----------------------------------------------------
| Constructors
------------------------*/
public CSVExcelStorage() {
super(FIELD_DELIMITER_DEFAULT_STR);
initializeInstance(FIELD_DELIMITER_DEFAULT_STR, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
}
public CSVExcelStorage(String delimiter) {
super(delimiter);
initializeInstance(delimiter, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
}
public CSVExcelStorage(String delimiter, String multilineTreatmentStr) {
super(delimiter);
initializeInstance(delimiter, multilineTreatmentStr, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
}
public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr) {
super(delimiter);
initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, HEADER_DEFAULT_STR);
}
public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {
super(delimiter);
initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, headerTreatmentStr);
}
private void initializeInstance(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {
fieldDelimiter = StorageUtil.parseFieldDel(delimiter);
multilineTreatment = canonicalizeMultilineTreatmentRequest(multilineTreatmentStr);
eolTreatment = canonicalizeEOLTreatmentRequest(eolTreatmentStr);
headerTreatment = canonicalizeHeaderTreatmentRequest(headerTreatmentStr);
}
private Multiline canonicalizeMultilineTreatmentRequest(String multilineTreatmentStr) {
if (multilineTreatmentStr.equalsIgnoreCase("YES_MULTILINE"))
return Multiline.YES;
else if (multilineTreatmentStr.equalsIgnoreCase("NO_MULTILINE"))
return Multiline.NO;
throw new IllegalArgumentException(
"Unrecognized multiline treatment argument " + multilineTreatmentStr + ". " +
"Should be either 'YES_MULTILINE' or 'NO_MULTILINE'");
}
private Linebreaks canonicalizeEOLTreatmentRequest(String eolTreatmentStr) {
if (eolTreatmentStr.equalsIgnoreCase("UNIX"))
return Linebreaks.UNIX;
else if (eolTreatmentStr.equalsIgnoreCase("WINDOWS"))
return Linebreaks.WINDOWS;
else if (eolTreatmentStr.equalsIgnoreCase("NOCHANGE"))
return Linebreaks.NOCHANGE;
throw new IllegalArgumentException(
"Unrecognized end-of-line treatment argument " + eolTreatmentStr + ". " +
"Should be one of 'UNIX', 'WINDOWS', or 'NOCHANGE'");
}
private Headers canonicalizeHeaderTreatmentRequest(String headerTreatmentStr) {
if (headerTreatmentStr.equalsIgnoreCase("DEFAULT"))
return Headers.DEFAULT;
else if (headerTreatmentStr.equalsIgnoreCase("READ_INPUT_HEADER"))
return Headers.READ_INPUT_HEADER;
else if (headerTreatmentStr.equalsIgnoreCase("SKIP_INPUT_HEADER"))
return Headers.SKIP_INPUT_HEADER;
else if (headerTreatmentStr.equalsIgnoreCase("WRITE_OUTPUT_HEADER"))
return Headers.WRITE_OUTPUT_HEADER;
else if (headerTreatmentStr.equalsIgnoreCase("SKIP_OUTPUT_HEADER"))
return Headers.SKIP_OUTPUT_HEADER;
throw new IllegalArgumentException(
"Unrecognized header treatment argument " + headerTreatmentStr + ". " +
"Should be one of 'READ_INPUT_HEADER', 'SKIP_INPUT_HEADER', 'WRITE_OUTPUT_HEADER', 'SKIP_OUTPUT_HEADER'");
}
// ---------------------------------------- STORAGE -----------------------------
public void checkSchema(ResourceSchema s) throws IOException {
// Not actually checking schema
// Actually, just storing it to use in the backend
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
p.setProperty(SCHEMA_SIGNATURE, s.toString());
}
public void prepareToWrite(RecordWriter writer) {
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema != null) {
// Parse the schema from the string stored in the properties object.
try {
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
} catch (ParserException pex) {
logger.warn("Could not parse schema for storing.");
}
}
if (headerTreatment == Headers.DEFAULT) {
headerTreatment = Headers.SKIP_OUTPUT_HEADER;
}
// PigStorage's prepareToWrite()
super.prepareToWrite(writer);
}
/* (non-Javadoc)
* @see org.apache.pig.builtin.PigStorage#putNext(org.apache.pig.data.Tuple)
*
* Given a tuple that corresponds to one record, write
* it out as CSV, converting among Unix/Windows line
* breaks as requested in the instantiation. Also take
* care of escaping field delimiters, double quotes,
* and linebreaks embedded within fields,
*
*/
@Override
public void putNext(Tuple tupleToWrite) throws IOException {
// If WRITE_OUTPUT_HEADER, store a header record with the names of each field
if (storingFirstRecord && headerTreatment == Headers.WRITE_OUTPUT_HEADER && schema != null) {
ArrayList<Object> headerProtoTuple = new ArrayList<Object>();
ResourceFieldSchema[] fields = schema.getFields();
for (ResourceFieldSchema field : fields) {
headerProtoTuple.add(field.getName());
}
super.putNext(tupleMaker.newTuple(headerProtoTuple));
}
storingFirstRecord = false;
ArrayList<Object> mProtoTuple = new ArrayList<Object>();
int embeddedNewlineIndex = -1;
int embeddedCarriageReturn = -1;
String fieldStr = null;
// For good debug messages:
int fieldCounter = -1;
// Do the escaping:
for (Object field : tupleToWrite.getAll()) {
fieldCounter++;
// Substitute a null value with an empty string. See PIG-2470.
if (field == null) {
fieldStr = null;
mProtoTuple.add("");
continue;
}
fieldStr = field.toString();
// Embedded double quotes are replaced by two double quotes:
fieldStr = fieldStr.replaceAll("[\"]", "\"\"");
// If any field delimiters are in the field, or if we did replace
// any double quotes with a pair of double quotes above,
// or if the string includes a newline character (LF:\n:0x0A)
// or includes a carriage return (CR:\r:0x0D)
// and we are to allow newlines in fields,
// then the entire field must be enclosed in double quotes:
embeddedNewlineIndex = fieldStr.indexOf(LINEFEED);
embeddedCarriageReturn = fieldStr.indexOf(CARRIAGE_RETURN);
if ((fieldStr.indexOf(fieldDelimiter) != -1) ||
(fieldStr.indexOf(DOUBLE_QUOTE) != -1) ||
(multilineTreatment == Multiline.YES) && (embeddedNewlineIndex != -1 || embeddedCarriageReturn != -1)) {
fieldStr = "\"" + fieldStr + "\"";
}
// If requested: replace any Linefeed-only (^J), with LF-Newline (^M^J),
// This is needed for Excel to recognize a field-internal
// new line:
if ((eolTreatment != Linebreaks.NOCHANGE) && (embeddedNewlineIndex != -1)) {
if (eolTreatment == Linebreaks.WINDOWS) {
loneLFDetector.reset(fieldStr);
loneLFDetector.matches();
fieldStr = loneLFDetector.replaceAll("$1\r\n");
} else if (eolTreatment == Linebreaks.UNIX) {
CRLFDetector.reset(fieldStr);
fieldStr = CRLFDetector.replaceAll("\n");
}
}
mProtoTuple.add(fieldStr);
}
// If Windows line breaks are requested, append
// a newline (0x0D a.k.a. ^M) to the last field
// so that the row termination will end up being
// \r\n, once the superclass' putNext() method
// is done below:
if ((eolTreatment == Linebreaks.WINDOWS) && (fieldStr != null))
mProtoTuple.set(mProtoTuple.size() - 1, fieldStr + "\r");
Tuple resTuple = tupleMaker.newTuple(mProtoTuple);
super.putNext(resTuple);
}
// ---------------------------------------- LOADING -----------------------------
/* (non-Javadoc)
* @see org.apache.pig.builtin.PigStorage#getNext()
*/
@Override
public Tuple getNext() throws IOException {
// If SKIP_INPUT_HEADER and this is the first input split, skip header record
// We store its value as a string though, so we can compare
// further records to it. If they are the same (this would
// happen if multiple small files each with a header were combined
// into one split), we know to skip the duplicate header record as well.
if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER &&
(splitIndex == 0 || splitIndex == -1)) {
try {
if (!in.nextKeyValue())
return null;
header = ((Text) in.getCurrentValue()).toString();
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
}
loadingFirstRecord = false;
mProtoTuple = new ArrayList<Object>();
getNextInQuotedField = false;
boolean evenQuotesSeen = true;
boolean sawEmbeddedRecordDelimiter = false;
byte[] buf = null;
if (!mRequiredColumnsInitialized) {
if (udfContextSignature != null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p.getProperty(udfContextSignature));
}
mRequiredColumnsInitialized = true;
}
// Note: we cannot factor out the check for nextKeyValue() being null,
// because that call overwrites buf with the new line, which is
// bad if we have a field with a newline.
try {
int recordLen = 0;
getNextFieldID = 0;
while (sawEmbeddedRecordDelimiter || getNextFieldID == 0) {
Text value = null;
if (sawEmbeddedRecordDelimiter) {
// Deal with pulling more records from the input, because
// a double quoted embedded newline was encountered in a field.
// Save the length of the record so far, plus one byte for the
// record delimiter (usually newline) that's embedded in the field
// we were working on before falling into this branch:
int prevLineLen = recordLen + 1;
// Save previous line (the one with the field that has the newline) in a new array.
// The last byte will be random; we'll fill in the embedded
// record delimiter (usually newline) below:
byte[] prevLineSaved = Arrays.copyOf(buf, prevLineLen);
prevLineSaved[prevLineLen - 1] = RECORD_DEL;
// Read the continuation of the record, unless EOF:
if (!in.nextKeyValue()) {
return null;
}
value = (Text) in.getCurrentValue();
recordLen = value.getLength();
// Grab the continuation's bytes:
buf = value.getBytes();
// Combine the previous line and the continuation into a new array.
// The following copyOf() does half the job: it allocates all the
// space, and also copies the previous line into that space:
byte[] prevLineAndContinuation = Arrays.copyOf(prevLineSaved, prevLineLen + recordLen);
// Now append the continuation. Parms: fromBuf, fromStartPos, toBuf, toStartPos, lengthToCopy:
System.arraycopy(buf, 0, prevLineAndContinuation, prevLineLen, recordLen);
// We'll work with the combination now:
buf = prevLineAndContinuation;
// Do the whole record over from the start:
mProtoTuple.clear();
getNextInQuotedField = false;
evenQuotesSeen = true;
getNextFieldID = 0;
recordLen = prevLineAndContinuation.length;
} else {
// Previous record finished cleanly: start with the next record,
// unless EOF:
if (!in.nextKeyValue()) {
return null;
}
value = (Text) in.getCurrentValue();
// if the line is a duplicate header and 'SKIP_INPUT_HEADER' is set, ignore it
// (this might happen if multiple files each with a header are combined into a single split)
if (headerTreatment == Headers.SKIP_INPUT_HEADER && value.toString().equals(header)) {
if (!in.nextKeyValue())
return null;
value = (Text) in.getCurrentValue();
}
buf = value.getBytes();
getNextFieldID = 0;
recordLen = value.getLength();
}
nextTupleSkipChar = false;
ByteBuffer fieldBuffer = ByteBuffer.allocate(recordLen);
sawEmbeddedRecordDelimiter = processOneInRecord(evenQuotesSeen,
buf, recordLen, fieldBuffer);
// The last field is never delimited by a FIELD_DEL, but by
// the end of the record. So we need to add that last field.
// The '!sawEmbeddedRecordDelimiter' handles the case of
// embedded newlines; we are amidst a field, not at
// the final record:
if (!sawEmbeddedRecordDelimiter)
readField(fieldBuffer, getNextFieldID++);
} // end while
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
return t;
}
/*
* Service method for getNext().
* Looks at char after char in the input record,
* that was previously pulled in by getNext(),
* and fills the fieldBuffer with those chars.
* <p>
* If multilineTreatment is Multiline.YES, then
* the return value indicates whether an embedded
* newline was found in a field, and that newline
* was in a field that opened with a double quote
* that was not closed before the end of the
* record was reached. If multilineTreatment
* is Multine.NO, then the return value is always false.
* <p>
* A return value of true will cause the calling method
* to continue pulling records from the input stream,
* until a closing quote is found.
* <p>
* Note that the recordReader that delivers records
* to out getNext() method above considers record
* boundaries to be newlines. We therefore never see an actual
* newline character embedded in a field. We just
* run out of record. For Multiline.NO we just take
* such an end of record at face value; the final
* resulting tuple will contain information only up
* to the first newline that was found.
* <p>
* For Multiline.YES, when we run out of record
* in an open double quote, our return of true from
* this method will cause the caller getNext() to
* do its additional readings of records from the
* stream, until the closing double quote is found.
* <p>
*
* @param evenQuotesSeen
* @param sawEmbeddedRecordDelimiter
* @param buf
* @param recordLen
* @param fieldBuffer
* @return
*/
private boolean processOneInRecord(boolean evenQuotesSeen,
byte[] buf, int recordLen,
ByteBuffer fieldBuffer) {
for (int i = 0; i < recordLen; i++) {
if (nextTupleSkipChar) {
nextTupleSkipChar = false;
continue;
}
byte b = buf[i];
if (getNextInQuotedField) {
if (b == DOUBLE_QUOTE) {
// Does a double quote immediately follow?
if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
fieldBuffer.put(b);
nextTupleSkipChar = true;
continue;
}
evenQuotesSeen = !evenQuotesSeen;
// If the quote is ending the last field in a record,
// set the genNextInQuotedField flag to false,
// so the return statement conditional (see below)
// is false, indicating that we're ready for the next record
if (!evenQuotesSeen && i == recordLen - 1) {
getNextInQuotedField = false;
}
if (evenQuotesSeen) {
fieldBuffer.put(DOUBLE_QUOTE);
}
} else if (!evenQuotesSeen && (b == fieldDelimiter || b == RECORD_DEL)) {
getNextInQuotedField = false;
readField(fieldBuffer, getNextFieldID++);
} else {
fieldBuffer.put(b);
}
} else if (b == DOUBLE_QUOTE) {
// Does a double quote immediately follow?
if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE) && (fieldBuffer.position() != 0)) {
fieldBuffer.put(b);
nextTupleSkipChar = true;
continue;
}
// If we are at the start of a field,
// that entire field is quoted:
getNextInQuotedField = true;
evenQuotesSeen = true;
} else if (b == fieldDelimiter) {
readField(fieldBuffer, getNextFieldID++); // end of the field
} else {
evenQuotesSeen = true;
fieldBuffer.put(b);
}
} // end for
return getNextInQuotedField && (multilineTreatment == Multiline.YES);
}
private void readField(ByteBuffer buf, int fieldID) {
if (mRequiredColumns==null || (mRequiredColumns.length>fieldID && mRequiredColumns[fieldID])) {
byte[] bytes = new byte[buf.position()];
buf.rewind();
buf.get(bytes, 0, bytes.length);
mProtoTuple.add(new DataByteArray(bytes));
}
buf.clear();
}
@Override
public void setLocation(String location, Job job) throws IOException {
loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() {
if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
return new Bzip2TextInputFormat();
} else {
return new PigTextInputFormat();
}
}
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
in = reader;
splitIndex = split.getSplitIndex();
if (headerTreatment == Headers.DEFAULT) {
headerTreatment = Headers.READ_INPUT_HEADER;
}
}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
if (requiredFieldList == null)
return null;
if (requiredFieldList.getFields() != null)
{
int lastColumn = -1;
for (RequiredField rf: requiredFieldList.getFields())
{
if (rf.getIndex()>lastColumn)
{
lastColumn = rf.getIndex();
}
}
mRequiredColumns = new boolean[lastColumn+1];
for (RequiredField rf: requiredFieldList.getFields())
{
if (rf.getIndex()!=-1)
mRequiredColumns[rf.getIndex()] = true;
}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
try {
p.setProperty(udfContextSignature, ObjectSerializer.serialize(mRequiredColumns));
} catch (Exception e) {
throw new RuntimeException("Cannot serialize mRequiredColumns");
}
}
return new RequiredFieldResponse(true);
}
@Override
public void setUDFContextSignature(String signature) {
this.udfContextSignature = signature;
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.udfContextSignature = signature;
}
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
}