blob: 116e25ce6df8eb5d8c70ef68e2d78a17dca1ef5b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.util.Bytes;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
public class CSVFile {
public static final byte LF = '\n';
public static int EOF = -1;
private static final Log LOG = LogFactory.getLog(CSVFile.class);
public static class CSVAppender extends FileAppender {
private final TableMeta meta;
private final Schema schema;
private final int columnNum;
private final FileSystem fs;
private FSDataOutputStream fos;
private DataOutputStream outputStream;
private CompressionOutputStream deflateFilter;
private char delimiter;
private TableStatistics stats = null;
private Compressor compressor;
private CompressionCodecFactory codecFactory;
private CompressionCodec codec;
private Path compressedPath;
private byte[] nullChars;
private int BUFFER_SIZE = 128 * 1024;
private int bufferedBytes = 0;
private long pos = 0;
private boolean isShuffle;
private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
private SerializerDeserializer serde;
public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
super(conf, schema, meta, path);
this.fs = path.getFileSystem(conf);
this.meta = meta;
this.schema = schema;
this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(CatalogConstants.CSVFILE_DELIMITER,
CatalogConstants.CSVFILE_DELIMITER_DEFAULT)).charAt(0);
this.columnNum = schema.size();
String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(CatalogConstants.CSVFILE_NULL));
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
nullChars = nullCharacters.getBytes();
}
}
@Override
public void init() throws IOException {
if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}
//determine the intermediate file type
String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
isShuffle = true;
} else {
isShuffle = false;
}
String codecName = this.meta.getOption(CatalogConstants.COMPRESSION_CODEC);
if(!StringUtils.isEmpty(codecName)){
codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByClassName(codecName);
compressor = CodecPool.getCompressor(codec);
if(compressor != null) compressor.reset(); //builtin gzip is null
String extension = codec.getDefaultExtension();
compressedPath = path.suffix(extension);
if (fs.exists(compressedPath)) {
throw new AlreadyExistsStorageException(compressedPath);
}
fos = fs.create(compressedPath);
deflateFilter = codec.createOutputStream(fos, compressor);
outputStream = new DataOutputStream(deflateFilter);
} else {
if (fs.exists(path)) {
throw new AlreadyExistsStorageException(path);
}
fos = fs.create(path);
outputStream = new DataOutputStream(new BufferedOutputStream(fos));
}
if (enabledStats) {
this.stats = new TableStatistics(this.schema);
}
try {
String serdeClass = this.meta.getOption(CatalogConstants.CSVFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
os.reset();
pos = fos.getPos();
bufferedBytes = 0;
super.init();
}
@Override
public void addTuple(Tuple tuple) throws IOException {
Datum datum;
int rowBytes = 0;
for (int i = 0; i < columnNum; i++) {
datum = tuple.get(i);
rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
if(columnNum - 1 > i){
os.write((byte) delimiter);
rowBytes += 1;
}
if (isShuffle) {
// it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, datum);
}
}
os.write(LF);
rowBytes += 1;
pos += rowBytes;
bufferedBytes += rowBytes;
if(bufferedBytes > BUFFER_SIZE){
flushBuffer();
}
// Statistical section
if (enabledStats) {
stats.incrementRow();
}
}
private void flushBuffer() throws IOException {
if(os.getLength() > 0) {
os.writeTo(outputStream);
os.reset();
bufferedBytes = 0;
}
}
@Override
public long getOffset() throws IOException {
return pos;
}
@Override
public void flush() throws IOException {
flushBuffer();
outputStream.flush();
}
@Override
public void close() throws IOException {
try {
flush();
// Statistical section
if (enabledStats) {
stats.setNumBytes(getOffset());
}
if(deflateFilter != null) {
deflateFilter.finish();
deflateFilter.resetState();
deflateFilter = null;
}
os.close();
} finally {
IOUtils.cleanup(LOG, fos);
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
}
}
}
@Override
public TableStats getStats() {
if (enabledStats) {
return stats.getTableStat();
} else {
return null;
}
}
public boolean isCompress() {
return compressor != null;
}
public String getExtension() {
return codec != null ? codec.getDefaultExtension() : "";
}
}
public static class CSVScanner extends FileScanner implements SeekableScanner {
public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
throws IOException {
super(conf, schema, meta, fragment);
factory = new CompressionCodecFactory(conf);
codec = factory.getCodec(fragment.getPath());
if (codec == null || codec instanceof SplittableCompressionCodec) {
splittable = true;
}
//Delimiter
String delim = meta.getOption(CatalogConstants.CSVFILE_DELIMITER, CatalogConstants.CSVFILE_DELIMITER_DEFAULT);
this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(CatalogConstants.CSVFILE_NULL));
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
nullChars = nullCharacters.getBytes();
}
}
private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
private char delimiter;
private FileSystem fs;
private FSDataInputStream fis;
private InputStream is; //decompressd stream
private CompressionCodecFactory factory;
private CompressionCodec codec;
private Decompressor decompressor;
private Seekable filePosition;
private boolean splittable = false;
private long startOffset, end, pos;
private int currentIdx = 0, validIdx = 0, recordCount = 0;
private int[] targetColumnIndexes;
private boolean eof = false;
private final byte[] nullChars;
private SplitLineReader reader;
private ArrayList<Long> fileOffsets = new ArrayList<Long>();
private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
private SerializerDeserializer serde;
@Override
public void init() throws IOException {
// FileFragment information
if(fs == null) {
fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
}
if(fis == null) fis = fs.open(fragment.getPath());
recordCount = 0;
pos = startOffset = fragment.getStartKey();
end = startOffset + fragment.getEndKey();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
fis, decompressor, startOffset, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
reader = new CompressedSplitLineReader(cIn, conf, null);
startOffset = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
is = cIn;
} else {
is = new DataInputStream(codec.createInputStream(fis, decompressor));
reader = new SplitLineReader(is, null);
filePosition = fis;
}
} else {
fis.seek(startOffset);
filePosition = fis;
is = fis;
reader = new SplitLineReader(is, null);
}
if (targets == null) {
targets = schema.toArray();
}
targetColumnIndexes = new int[targets.length];
for (int i = 0; i < targets.length; i++) {
targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
}
try {
String serdeClass = this.meta.getOption(CatalogConstants.CSVFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
super.init();
Arrays.sort(targetColumnIndexes);
if (LOG.isDebugEnabled()) {
LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
"," + fs.getFileStatus(fragment.getPath()).getLen());
}
if (startOffset != 0) {
pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
}
eof = false;
page();
}
private int maxBytesToConsume(long pos) {
return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long fragmentable() throws IOException {
return end - getFilePosition();
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompress()) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
private void page() throws IOException {
// // Index initialization
currentIdx = 0;
validIdx = 0;
int currentBufferPos = 0;
int bufferedSize = 0;
buffer.reset();
startOffsets.clear();
rowLengthList.clear();
fileOffsets.clear();
if(eof) {
return;
}
while (DEFAULT_PAGE_SIZE >= bufferedSize){
int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
if(ret == 0){
break;
} else {
fileOffsets.add(pos);
pos += ret;
startOffsets.add(currentBufferPos);
currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
bufferedSize += ret;
validIdx++;
recordCount++;
}
if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
eof = true;
break;
}
}
if (tableStats != null) {
tableStats.setReadBytes(pos - startOffset);
tableStats.setNumRows(recordCount);
}
}
@Override
public float getProgress() {
try {
if(eof) {
return 1.0f;
}
long filePos = getFilePosition();
if (startOffset == filePos) {
return 0.0f;
} else {
long readBytes = filePos - startOffset;
long remainingBytes = Math.max(end - filePos, 0);
return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
return 0.0f;
}
}
@Override
public Tuple next() throws IOException {
try {
if (currentIdx == validIdx) {
if (eof) {
return null;
} else {
page();
if(currentIdx == validIdx){
return null;
}
}
}
long offset = -1;
if(!isCompress()){
offset = fileOffsets.get(currentIdx);
}
byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
currentIdx++;
return new LazyTuple(schema, cells, offset, nullChars, serde);
} catch (Throwable t) {
LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
LOG.error("Tuple list current index: " + currentIdx, t);
throw new IOException(t);
}
}
private boolean isCompress() {
return codec != null;
}
@Override
public void reset() throws IOException {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
init();
}
@Override
public void close() throws IOException {
try {
if (tableStats != null) {
tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead)
tableStats.setNumRows(recordCount);
}
IOUtils.cleanup(LOG, reader, is, fis);
fs = null;
is = null;
fis = null;
if (LOG.isDebugEnabled()) {
LOG.debug("CSVScanner processed record:" + recordCount);
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
}
@Override
public boolean isProjectable() {
return true;
}
@Override
public boolean isSelectable() {
return false;
}
@Override
public void setSearchCondition(Object expr) {
}
@Override
public void seek(long offset) throws IOException {
if(isCompress()) throw new UnsupportedException();
int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
if (tupleIndex > -1) {
this.currentIdx = tupleIndex;
} else if (isSplittable() && end >= offset || startOffset <= offset) {
eof = false;
fis.seek(offset);
pos = offset;
reader.reset();
this.currentIdx = 0;
this.validIdx = 0;
// pageBuffer();
} else {
throw new IOException("invalid offset " +
" < start : " + startOffset + " , " +
" end : " + end + " , " +
" filePos : " + filePosition.getPos() + " , " +
" input offset : " + offset + " >");
}
}
@Override
public long getNextOffset() throws IOException {
if(isCompress()) throw new UnsupportedException();
if (this.currentIdx == this.validIdx) {
if (fragmentable() <= 0) {
return -1;
} else {
page();
if(currentIdx == validIdx) return -1;
}
}
return fileOffsets.get(currentIdx);
}
@Override
public boolean isSplittable(){
return splittable;
}
}
}