blob: 6fd06266e4ebebecb642f81f831d35c3706929b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.benchmark.byTask.feeds;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Locale;
import org.apache.lucene.benchmark.Constants;
import org.apache.lucene.benchmark.byTask.utils.Config;
/**
* A {@link ContentSource} reading from the Reuters collection.
* <p>
* Config properties:
* <ul>
* <li><b>work.dir</b> - path to the root of docs and indexes dirs (default
* <b>work</b>).
* <li><b>docs.dir</b> - path to the docs dir (default <b>reuters-out</b>).
* </ul>
*/
public class ReutersContentSource extends ContentSource {
private static final class DateFormatInfo {
DateFormat df;
ParsePosition pos;
}
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
private Path dataDir = null;
private ArrayList<Path> inputFiles = new ArrayList<>();
private int[] docCountArr;
private volatile boolean docCountArrCreated;
@Override
public void setConfig(Config config) {
super.setConfig(config);
Path workDir = Paths.get(config.get("work.dir", "work"));
String d = config.get("docs.dir", "reuters-out");
dataDir = Paths.get(d);
if (!dataDir.isAbsolute()) {
dataDir = workDir.resolve(d);
}
inputFiles.clear();
try {
collectFiles(dataDir, inputFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (inputFiles.size() == 0) {
throw new RuntimeException("No txt files in dataDir: "+dataDir.toAbsolutePath());
}
}
private synchronized DateFormatInfo getDateFormatInfo() {
DateFormatInfo dfi = dateFormat.get();
if (dfi == null) {
dfi = new DateFormatInfo();
// date format: 30-MAR-1987 14:22:36.87
dfi.df = new SimpleDateFormat("dd-MMM-yyyy kk:mm:ss.SSS",Locale.ENGLISH);
dfi.df.setLenient(true);
dfi.pos = new ParsePosition(0);
dateFormat.set(dfi);
}
return dfi;
}
private Date parseDate(String dateStr) {
DateFormatInfo dfi = getDateFormatInfo();
dfi.pos.setIndex(0);
dfi.pos.setErrorIndex(-1);
return dfi.df.parse(dateStr.trim(), dfi.pos);
}
@Override
public void close() throws IOException {
// TODO implement?
}
@Override
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
if (docCountArrCreated == false) {
docCountArrInit();
}
int threadIndexSize = Thread.currentThread().getName().length();
int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length();
int threadIndex = 0;
if (docCountArr.length > 1) {
// Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index',
// in TaskSequence.java's doParallelTasks()
threadIndex =
Integer.parseInt(
Thread.currentThread()
.getName()
.substring(parallelTaskThreadSize + 1, threadIndexSize));
}
assert (threadIndex >= 0 && threadIndex < docCountArr.length)
: "Please check threadIndex or docCountArr length";
int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
int inFileSize = inputFiles.size();
// Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of
// Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
int fileIndex = stride % inFileSize;
int iteration = stride / inFileSize;
docCountArr[threadIndex]++;
Path f = inputFiles.get(fileIndex);
String name = f.toRealPath() + "_" + iteration;
try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
reader.readLine();// skip an empty line
String title = reader.readLine();
reader.readLine();// skip an empty line
StringBuilder bodyBuf = new StringBuilder(1024);
String line = null;
while ((line = reader.readLine()) != null) {
bodyBuf.append(line).append(' ');
}
addBytes(Files.size(f));
Date date = parseDate(dateStr.trim());
docData.clear();
docData.setName(name);
docData.setBody(bodyBuf.toString());
docData.setTitle(title);
docData.setDate(date);
return docData;
}
}
@Override
public synchronized void resetInputs() throws IOException {
super.resetInputs();
}
private synchronized void docCountArrInit() {
if (docCountArrCreated == false) {
docCountArr = new int[getConfig().getNumThreads()];
docCountArrCreated = true;
}
}
}