blob: 619ceb1b534ef056a347a6d5c64a3be9d7348ba1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.ltsv;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
public class LTSVRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class);
private static final int MAX_RECORDS_PER_BATCH = 8096;
private final String inputPath;
private final InputStream fsStream;
private final BufferedReader reader;
private DrillBuf buffer;
private VectorContainerWriter writer;
public LTSVRecordReader(FragmentContext fragmentContext, Path path, DrillFileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
this.inputPath = path.toUri().getPath();
try {
this.fsStream = fileSystem.openPossiblyCompressedStream(path);
this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8));
this.buffer = fragmentContext.getManagedBuffer();
setColumns(columns);
} catch (IOException e) {
throw UserException.dataReadError(e)
.message(String.format("Failed to open input file: %s", inputPath))
.build(logger);
}
}
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
Set<SchemaPath> transformed = new LinkedHashSet<>();
if (!isStarQuery()) {
transformed.addAll(projected);
} else {
transformed.add(SchemaPath.STAR_COLUMN);
}
return transformed;
}
public void setup(final OperatorContext context, final OutputMutator output) {
this.writer = new VectorContainerWriter(output);
}
public int next() {
this.writer.allocate();
this.writer.reset();
int recordCount = 0;
try {
BaseWriter.MapWriter map = this.writer.rootAsMap();
String line;
while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
// Skip empty lines
if (line.trim().length() == 0) {
continue;
}
List<String[]> fields = new ArrayList<>();
for (String field : line.split("\t")) {
int index = field.indexOf(":");
if (index <= 0) {
throw new ParseException(String.format("Invalid LTSV format: %s\n%d:%s", inputPath, recordCount + 1, line), 0);
}
String fieldName = field.substring(0, index);
String fieldValue = field.substring(index + 1);
if (selectedColumn(fieldName)) {
fields.add(new String[]{fieldName, fieldValue});
}
}
if (fields.size() == 0) {
continue;
}
this.writer.setPosition(recordCount);
map.start();
for (String[] field : fields) {
byte[] bytes = field[1].getBytes(StandardCharsets.UTF_8);
this.buffer = this.buffer.reallocIfNeeded(bytes.length);
this.buffer.setBytes(0, bytes, 0, bytes.length);
map.varChar(field[0]).writeVarChar(0, bytes.length, buffer);
}
map.end();
recordCount++;
}
this.writer.setValueCount(recordCount);
return recordCount;
} catch (final Exception e) {
String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1);
throw UserException.dataReadError(e)
.message(msg)
.build(logger);
}
}
private boolean selectedColumn(String fieldName) {
for (SchemaPath col : getColumns()) {
if (col.equals(SchemaPath.STAR_COLUMN) || col.getRootSegment().getPath().equals(fieldName)) {
return true;
}
}
return false;
}
public void close() throws Exception {
AutoCloseables.close(reader, fsStream);
}
}