blob: 2cc766211e346728c6f4f5e10a9769bdaf2dfec4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.loader;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.update.*;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.internal.csv.CSVStrategy;
import org.apache.solr.internal.csv.CSVParser;
import org.apache.commons.io.IOUtils;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.List;
import java.util.HashMap;
import java.util.Iterator;
import java.io.*;
public abstract class CSVLoaderBase extends ContentStreamLoader {
public static final String SEPARATOR="separator";
public static final String FIELDNAMES="fieldnames";
public static final String HEADER="header";
public static final String SKIP="skip";
public static final String SKIPLINES="skipLines";
public static final String MAP="map";
public static final String TRIM="trim";
public static final String EMPTY="keepEmpty";
public static final String SPLIT="split";
public static final String ENCAPSULATOR="encapsulator";
public static final String ESCAPE="escape";
public static final String OVERWRITE="overwrite";
public static final String LITERALS_PREFIX = "literal.";
public static final String ROW_ID = "rowid";
public static final String ROW_ID_OFFSET = "rowidOffset";
private static Pattern colonSplit = Pattern.compile(":");
private static Pattern commaSplit = Pattern.compile(",");
final SolrParams params;
final CSVStrategy strategy;
protected final UpdateRequestProcessor processor;
// hashmap to save any literal fields and their values
HashMap <String, String> literals;
String[] fieldnames;
CSVLoaderBase.FieldAdder[] adders;
String rowId = null;// if not null, add a special field by the name given with the line number/row id as the value
int rowIdOffset = 0; //add to line/rowid before creating the field
int skipLines; // number of lines to skip at start of file
protected final AddUpdateCommand templateAdd;
/** Add a field to a document unless it's zero length.
* The FieldAdder hierarchy handles all the complexity of
* further transforming or splitting field values to keep the
* main logic loop clean. All implementations of add() must be
* MT-safe!
*/
private class FieldAdder {
void add(SolrInputDocument doc, int line, int column, String val) {
if (val.length() > 0) {
doc.addField(fieldnames[column],val);
}
}
}
/** add zero length fields */
private class FieldAdderEmpty extends CSVLoaderBase.FieldAdder {
@Override
void add(SolrInputDocument doc, int line, int column, String val) {
doc.addField(fieldnames[column],val);
}
}
/** trim fields */
private class FieldTrimmer extends CSVLoaderBase.FieldAdder {
private final CSVLoaderBase.FieldAdder base;
FieldTrimmer(CSVLoaderBase.FieldAdder base) { this.base=base; }
@Override
void add(SolrInputDocument doc, int line, int column, String val) {
base.add(doc, line, column, val.trim());
}
}
/** map a single value.
* for just a couple of mappings, this is probably faster than
* using a HashMap.
*/
private class FieldMapperSingle extends CSVLoaderBase.FieldAdder {
private final String from;
private final String to;
private final CSVLoaderBase.FieldAdder base;
FieldMapperSingle(String from, String to, CSVLoaderBase.FieldAdder base) {
this.from=from;
this.to=to;
this.base=base;
}
@Override
void add(SolrInputDocument doc, int line, int column, String val) {
if (from.equals(val)) val=to;
base.add(doc,line,column,val);
}
}
/** Split a single value into multiple values based on
* a CSVStrategy.
*/
private class FieldSplitter extends CSVLoaderBase.FieldAdder {
private final CSVStrategy strategy;
private final CSVLoaderBase.FieldAdder base;
FieldSplitter(CSVStrategy strategy, CSVLoaderBase.FieldAdder base) {
this.strategy = strategy;
this.base = base;
}
@Override
void add(SolrInputDocument doc, int line, int column, String val) {
CSVParser parser = new CSVParser(new StringReader(val), strategy);
try {
String[] vals = parser.getLine();
if (vals!=null) {
for (String v: vals) base.add(doc,line,column,v);
} else {
base.add(doc,line,column,val);
}
} catch (IOException e) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,e);
}
}
}
String errHeader="CSVLoader:";
protected CSVLoaderBase(SolrQueryRequest req, UpdateRequestProcessor processor) {
this.processor = processor;
this.params = req.getParams();
this.literals = new HashMap<>();
templateAdd = new AddUpdateCommand(req);
templateAdd.overwrite=params.getBool(OVERWRITE,true);
templateAdd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
strategy = new CSVStrategy(',', '"', CSVStrategy.COMMENTS_DISABLED, CSVStrategy.ESCAPE_DISABLED, false, false, false, true, "\n");
String sep = params.get(SEPARATOR);
if (sep!=null) {
if (sep.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid separator:'"+sep+"'");
strategy.setDelimiter(sep.charAt(0));
}
String encapsulator = params.get(ENCAPSULATOR);
if (encapsulator!=null) {
if (encapsulator.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid encapsulator:'"+encapsulator+"'");
}
String escape = params.get(ESCAPE);
if (escape!=null) {
if (escape.length()!=1) throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Invalid escape:'"+escape+"'");
}
rowId = params.get(ROW_ID);
rowIdOffset = params.getInt(ROW_ID_OFFSET, 0);
// if only encapsulator or escape is set, disable the other escaping mechanism
if (encapsulator == null && escape != null) {
strategy.setEncapsulator( CSVStrategy.ENCAPSULATOR_DISABLED);
strategy.setEscape(escape.charAt(0));
} else {
if (encapsulator != null) {
strategy.setEncapsulator(encapsulator.charAt(0));
}
if (escape != null) {
char ch = escape.charAt(0);
strategy.setEscape(ch);
if (ch == '\\') {
// If the escape is the standard backslash, then also enable
// unicode escapes (it's harmless since 'u' would not otherwise
// be escaped.
strategy.setUnicodeEscapeInterpretation(true);
}
}
}
String fn = params.get(FIELDNAMES);
fieldnames = fn != null ? commaSplit.split(fn,-1) : null;
Boolean hasHeader = params.getBool(HEADER);
skipLines = params.getInt(SKIPLINES,0);
if (fieldnames==null) {
if (null == hasHeader) {
// assume the file has the headers if they aren't supplied in the args
hasHeader=true;
} else if (!hasHeader) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"CSVLoader: must specify fieldnames=<fields>* or header=true");
}
} else {
// if the fieldnames were supplied and the file has a header, we need to
// skip over that header.
if (hasHeader!=null && hasHeader) skipLines++;
prepareFields();
}
}
/** create the FieldAdders that control how each field is indexed */
void prepareFields() {
// Possible future optimization: for really rapid incremental indexing
// from a POST, one could cache all of this setup info based on the params.
// The link from FieldAdder to this would need to be severed for that to happen.
adders = new CSVLoaderBase.FieldAdder[fieldnames.length];
String skipStr = params.get(SKIP);
List<String> skipFields = skipStr==null ? null : StrUtils.splitSmart(skipStr,',');
CSVLoaderBase.FieldAdder adder = new CSVLoaderBase.FieldAdder();
CSVLoaderBase.FieldAdder adderKeepEmpty = new CSVLoaderBase.FieldAdderEmpty();
for (int i=0; i<fieldnames.length; i++) {
String fname = fieldnames[i];
// to skip a field, leave the entries in fields and addrs null
if (fname.length()==0 || (skipFields!=null && skipFields.contains(fname))) continue;
boolean keepEmpty = params.getFieldBool(fname,EMPTY,false);
adders[i] = keepEmpty ? adderKeepEmpty : adder;
// Order that operations are applied: split -> trim -> map -> add
// so create in reverse order.
// Creation of FieldAdders could be optimized and shared among fields
String[] fmap = params.getFieldParams(fname,MAP);
if (fmap!=null) {
for (String mapRule : fmap) {
String[] mapArgs = colonSplit.split(mapRule,-1);
if (mapArgs.length!=2)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Map rules must be of the form 'from:to' ,got '"+mapRule+"'");
adders[i] = new CSVLoaderBase.FieldMapperSingle(mapArgs[0], mapArgs[1], adders[i]);
}
}
if (params.getFieldBool(fname,TRIM,false)) {
adders[i] = new CSVLoaderBase.FieldTrimmer(adders[i]);
}
if (params.getFieldBool(fname,SPLIT,false)) {
String sepStr = params.getFieldParam(fname,SEPARATOR);
char fsep = sepStr==null || sepStr.length()==0 ? ',' : sepStr.charAt(0);
String encStr = params.getFieldParam(fname,ENCAPSULATOR);
char fenc = encStr==null || encStr.length()==0 ? (char)-2 : encStr.charAt(0);
String escStr = params.getFieldParam(fname,ESCAPE);
char fesc = escStr==null || escStr.length()==0 ? CSVStrategy.ESCAPE_DISABLED : escStr.charAt(0);
CSVStrategy fstrat = new CSVStrategy
(fsep, fenc, CSVStrategy.COMMENTS_DISABLED, fesc, false, false, false, false, "\n");
adders[i] = new CSVLoaderBase.FieldSplitter(fstrat, adders[i]);
}
}
// look for any literal fields - literal.foo=xyzzy
Iterator<String> paramNames = params.getParameterNamesIterator();
while (paramNames.hasNext()) {
String pname = paramNames.next();
if (!pname.startsWith(LITERALS_PREFIX)) continue;
String name = pname.substring(LITERALS_PREFIX.length());
literals.put(name, params.get(pname));
}
}
private void input_err(String msg, String[] line, int lineno) {
StringBuilder sb = new StringBuilder();
sb.append(errHeader).append(", line=").append(lineno).append(",").append(msg).append("\n\tvalues={");
for (String val: line) {
sb.append("'").append(val).append("',"); }
sb.append('}');
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,sb.toString());
}
private void input_err(String msg, String[] lines, int lineNo, Throwable e) {
StringBuilder sb = new StringBuilder();
sb.append(errHeader).append(", line=").append(lineNo).append(",").append(msg).append("\n\tvalues={");
if (lines != null) {
for (String val : lines) {
sb.append("'").append(val).append("',");
}
} else {
sb.append("NO LINES AVAILABLE");
}
sb.append('}');
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,sb.toString(), e);
}
/** load the CSV input */
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws IOException {
errHeader = "CSVLoader: input=" + stream.getSourceInfo();
Reader reader = null;
try {
reader = stream.getReader();
if (skipLines>0) {
if (!(reader instanceof BufferedReader)) {
reader = new BufferedReader(reader);
}
BufferedReader r = (BufferedReader)reader;
for (int i=0; i<skipLines; i++) {
r.readLine();
}
}
CSVParser parser = new CSVParser(reader, strategy);
// parse the fieldnames from the header of the file
if (fieldnames==null) {
fieldnames = parser.getLine();
if (fieldnames==null) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Expected fieldnames in CSV input");
}
prepareFields();
}
// read the rest of the CSV file
for(;;) {
int line = parser.getLineNumber(); // for error reporting in MT mode
String[] vals = null;
try {
vals = parser.getLine();
} catch (IOException e) {
//Catch the exception and rethrow it with more line information
input_err("can't read line: " + line, null, line, e);
}
if (vals==null) break;
if (vals.length != fieldnames.length) {
input_err("expected "+fieldnames.length+" values but got "+vals.length, vals, line);
}
addDoc(line,vals);
}
} finally{
if (reader != null) {
IOUtils.closeQuietly(reader);
}
}
}
/** called for each line of values (document) */
public abstract void addDoc(int line, String[] vals) throws IOException;
/** this must be MT safe... may be called concurrently from multiple threads. */
protected void doAdd(int line, String[] vals, SolrInputDocument doc, AddUpdateCommand template) throws IOException {
// the line number is passed for error reporting in MT mode as well as for optional rowId.
// first, create the lucene document
for (int i=0; i<vals.length; i++) {
if (adders[i]==null) continue; // skip this field
String val = vals[i];
adders[i].add(doc, line, i, val);
}
// add any literals
for (Map.Entry<String, String> entry : literals.entrySet()) {
doc.addField(entry.getKey(), entry.getValue());
}
if (rowId != null){
doc.addField(rowId, line + rowIdOffset);
}
template.solrDoc = doc;
processor.processAdd(template);
}
}