blob: cff370555f4040dbc09df6e880eee7910f29da40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.benchmark.byTask.tasks;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.DocMaker;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.benchmark.byTask.utils.StreamUtils;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
/**
* A task which writes documents, one line per document. Each line is in the following format: title
* <TAB> date <TAB> body. The output of this task can be consumed by {@link
* org.apache.lucene.benchmark.byTask.feeds.LineDocSource} and is intended to save the IO overhead
* of opening a file per document to be indexed.
*
* <p>The format of the output is set according to the output file extension. Compression is
* recommended when the output file is expected to be large. See info on file extensions in {@link
* org.apache.lucene.benchmark.byTask.utils.StreamUtils.Type}
*
* <p>Supports the following parameters:
*
* <ul>
* <li><b>line.file.out</b> - the name of the file to write the output to. That parameter is
* mandatory. <b>NOTE:</b> the file is re-created.
* <li><b>line.fields</b> - which fields should be written in each line. (optional, default:
* {@link #DEFAULT_FIELDS}).
* <li><b>sufficient.fields</b> - list of field names, separated by comma, which, if all of them
* are missing, the document will be skipped. For example, to require that at least one of
* f1,f2 is not empty, specify: "f1,f2" in this field. To specify that no field is required,
* i.e. that even empty docs should be emitted, specify <b>","</b>. (optional, default: {@link
* #DEFAULT_SUFFICIENT_FIELDS}).
* </ul>
*
* <b>NOTE:</b> this class is not thread-safe and if used by multiple threads the output is
* unspecified (as all will write to the same output file in a non-synchronized way).
*/
public class WriteLineDocTask extends PerfTask {
public static final String FIELDS_HEADER_INDICATOR = "FIELDS_HEADER_INDICATOR###";
public static final char SEP = '\t';
/** Fields to be written by default */
public static final String[] DEFAULT_FIELDS =
new String[] {
DocMaker.TITLE_FIELD, DocMaker.DATE_FIELD, DocMaker.BODY_FIELD,
};
/** Default fields which at least one of them is required to not skip the doc. */
public static final String DEFAULT_SUFFICIENT_FIELDS =
DocMaker.TITLE_FIELD + ',' + DocMaker.BODY_FIELD;
private int docSize = 0;
protected final String fname;
private final PrintWriter lineFileOut;
private final DocMaker docMaker;
private final ThreadLocal<StringBuilder> threadBuffer = new ThreadLocal<>();
private final ThreadLocal<Matcher> threadNormalizer = new ThreadLocal<>();
private final String[] fieldsToWrite;
private final boolean[] sufficientFields;
private final boolean checkSufficientFields;
public WriteLineDocTask(PerfRunData runData) throws Exception {
super(runData);
Config config = runData.getConfig();
fname = config.get("line.file.out", null);
if (fname == null) {
throw new IllegalArgumentException("line.file.out must be set");
}
OutputStream out = StreamUtils.outputStream(Paths.get(fname));
lineFileOut =
new PrintWriter(
new BufferedWriter(
new OutputStreamWriter(out, StandardCharsets.UTF_8), StreamUtils.BUFFER_SIZE));
docMaker = runData.getDocMaker();
// init fields
String f2r = config.get("line.fields", null);
if (f2r == null) {
fieldsToWrite = DEFAULT_FIELDS;
} else {
if (f2r.indexOf(SEP) >= 0) {
throw new IllegalArgumentException(
"line.fields " + f2r + " should not contain the separator char: " + SEP);
}
fieldsToWrite = f2r.split(",");
}
// init sufficient fields
sufficientFields = new boolean[fieldsToWrite.length];
String suff = config.get("sufficient.fields", DEFAULT_SUFFICIENT_FIELDS);
if (",".equals(suff)) {
checkSufficientFields = false;
} else {
checkSufficientFields = true;
HashSet<String> sf = new HashSet<>(Arrays.asList(suff.split(",")));
for (int i = 0; i < fieldsToWrite.length; i++) {
if (sf.contains(fieldsToWrite[i])) {
sufficientFields[i] = true;
}
}
}
writeHeader(lineFileOut);
}
/** Write header to the lines file - indicating how to read the file later. */
protected void writeHeader(PrintWriter out) {
StringBuilder sb = threadBuffer.get();
if (sb == null) {
sb = new StringBuilder();
threadBuffer.set(sb);
}
sb.setLength(0);
sb.append(FIELDS_HEADER_INDICATOR);
for (String f : fieldsToWrite) {
sb.append(SEP).append(f);
}
out.println(sb.toString());
}
@Override
protected String getLogMessage(int recsCount) {
return "Wrote " + recsCount + " line docs";
}
@Override
public int doLogic() throws Exception {
Document doc = docSize > 0 ? docMaker.makeDocument(docSize) : docMaker.makeDocument();
Matcher matcher = threadNormalizer.get();
if (matcher == null) {
matcher = Pattern.compile("[\t\r\n]+").matcher("");
threadNormalizer.set(matcher);
}
StringBuilder sb = threadBuffer.get();
if (sb == null) {
sb = new StringBuilder();
threadBuffer.set(sb);
}
sb.setLength(0);
boolean sufficient = !checkSufficientFields;
for (int i = 0; i < fieldsToWrite.length; i++) {
IndexableField f = doc.getField(fieldsToWrite[i]);
String text = f == null ? "" : matcher.reset(f.stringValue()).replaceAll(" ").trim();
sb.append(text).append(SEP);
sufficient |= text.length() > 0 && sufficientFields[i];
}
if (sufficient) {
sb.setLength(sb.length() - 1); // remove redundant last separator
// lineFileOut is a PrintWriter, which synchronizes internally in println.
lineFileOut(doc).println(sb.toString());
}
return 1;
}
/** Selects output line file by written doc. Default: original output line file. */
protected PrintWriter lineFileOut(Document doc) {
return lineFileOut;
}
@Override
public void close() throws Exception {
lineFileOut.close();
super.close();
}
/**
* Set the params (docSize only)
*
* @param params docSize, or 0 for no limit.
*/
@Override
public void setParams(String params) {
super.setParams(params);
docSize = (int) Float.parseFloat(params);
}
@Override
public boolean supportsParams() {
return true;
}
}