blob: 66b0393bf51aab58f1e8b348ecf943395d39bbb8 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.loader.reader.file;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.slf4j.Logger;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.parser.CsvLineParser;
import com.baidu.hugegraph.loader.parser.JsonLineParser;
import com.baidu.hugegraph.loader.parser.LineParser;
import com.baidu.hugegraph.loader.parser.TextLineParser;
import com.baidu.hugegraph.loader.reader.InputReader;
import com.baidu.hugegraph.loader.reader.Line;
import com.baidu.hugegraph.loader.reader.Readable;
import com.baidu.hugegraph.loader.source.file.Compression;
import com.baidu.hugegraph.loader.source.file.FileFormat;
import com.baidu.hugegraph.loader.source.file.FileSource;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
public abstract class AbstractFileReader implements InputReader {
private static final Logger LOG = Log.logger(AbstractFileReader.class);
private static final int BUF_SIZE = 5 * 1024 * 1024;
private final FileSource source;
private Readers readers;
private LineParser parser;
private Line nextLine;
public AbstractFileReader(FileSource source) {
this.source = source;
this.readers = null;
this.parser = null;
this.nextLine = null;
}
public FileSource source() {
return this.source;
}
protected abstract Readers openReaders() throws IOException;
@Override
public void init() {
try {
this.readers = this.openReaders();
} catch (IOException e) {
throw new LoadException("Failed to open readers for '%s'",
this.source);
}
this.parser = createLineParser(this.source);
this.parser.init(this);
}
@Override
public boolean hasNext() {
if (this.nextLine != null) {
return true;
}
this.nextLine = this.fetch();
return this.nextLine != null;
}
@Override
public Line next() {
if (!this.hasNext()) {
throw new NoSuchElementException("Reach end of file");
}
Line line = this.nextLine;
this.nextLine = null;
return line;
}
@Override
public void close() throws IOException {
if (this.readers != null) {
this.readers.close();
}
}
public String readNextLine() throws IOException {
E.checkState(this.readers != null, "The readers shouldn't be null");
return this.readers.readNextLine();
}
protected Line fetch() {
String rawLine;
try {
rawLine = this.readNextLine();
} catch (IOException e) {
throw new LoadException("Read next line error", e);
}
if (rawLine == null) {
return null;
}
// Skip the line matched specified regex
if (this.needSkipLine(rawLine)) {
return this.fetch();
} else {
return this.parser.parse(rawLine);
}
}
private boolean needSkipLine(String line) {
return line.matches(this.source.skippedLineRegex());
}
private boolean isDuplicateHeader(String line) {
assert line != null;
// Json file doesn't exist header line
if (this.parser.getClass().isAssignableFrom(TextLineParser.class)) {
return false;
}
/*
* All lines will be treated as data line if the header is
* user specified explicitly
*/
if (this.source.header() != null) {
return false;
}
TextLineParser parser = (TextLineParser) this.parser;
E.checkState(parser.header() != null && !parser.header().isEmpty(),
"The header shoudn't be null or empty");
List<String> columns = parser.split(line);
if (parser.header().size() != columns.size()) {
return false;
}
for (int i = 0; i < parser.header().size(); i++) {
if (!parser.header().get(i).equals(columns.get(i))) {
return false;
}
}
return true;
}
private static BufferedReader createBufferedReader(InputStream stream,
FileSource source)
throws Exception {
E.checkNotNull(stream, "InputStream");
try {
Reader csr = createCompressReader(stream, source);
return new BufferedReader(csr, BUF_SIZE);
} catch (IOException e) {
try {
stream.close();
} catch (IOException ignored) {
LOG.warn("Failed to close file {}", source.path());
}
throw e;
}
}
private static Reader createCompressReader(InputStream stream,
FileSource source)
throws Exception {
Compression compression = source.compression();
String charset = source.charset();
switch (compression) {
case NONE:
return new InputStreamReader(stream, charset);
case GZIP:
case BZ2:
case XZ:
case LZMA:
case PACK200:
case SNAPPY_RAW:
case SNAPPY_FRAMED:
case Z:
case DEFLATE:
case LZ4_BLOCK:
case LZ4_FRAMED:
CompressorStreamFactory factory = new CompressorStreamFactory();
CompressorInputStream cis = factory.createCompressorInputStream(
compression.string(), stream);
return new InputStreamReader(cis, charset);
default:
throw new LoadException("Unsupported compression format '%s'",
compression);
}
}
private static LineParser createLineParser(FileSource source) {
FileFormat format = source.format();
switch (format) {
case CSV:
return new CsvLineParser();
case TEXT:
return new TextLineParser();
case JSON:
return new JsonLineParser();
default:
throw new AssertionError(String.format(
"Unsupported file format '%s'", source));
}
}
/**
* Used to iterate all readable data source, like files, paths
*/
protected final class Readers {
private final FileSource source;
private final List<Readable> readables;
private BufferedReader reader;
private int index;
public Readers(FileSource source, List<Readable> readables) {
this.source = source;
this.readables = readables;
this.index = 0;
if (readables == null || readables.isEmpty()) {
this.reader = null;
} else {
// Open the first one
this.reader = this.open(this.index);
}
}
private BufferedReader open(int index) {
assert index < this.readables.size();
Readable readable = this.readables.get(index);
LOG.debug("Ready to open '{}'", readable);
InputStream stream = null;
try {
stream = readable.open();
return createBufferedReader(stream, this.source);
} catch (IOException e) {
throw new LoadException("Failed to open stream for '%s'",
e, readable);
} catch (Exception e) {
if (stream != null) {
try {
stream.close();
} catch (IOException ignored) {
LOG.warn("Failed to close stream of '{}'", readable);
}
}
throw new LoadException("Failed to create reader for '%s'",
readable);
}
}
private void close(int index) throws IOException {
assert index < this.readables.size();
Readable readable = this.readables.get(index);
LOG.info("Ready to close '{}'", readable);
this.close();
}
public String readNextLine() throws IOException {
// reader is null means there is no file
if (this.reader == null) {
return null;
}
boolean openNext = false;
String line;
while ((line = this.reader.readLine()) == null) {
// The current file is read at the end, ready to read next one
this.close(this.index);
if (++this.index < this.readables.size()) {
// Open the second or subsequent readables, need
this.reader = this.open(this.index);
openNext = true;
} else {
return null;
}
}
// Determine if need to skip duplicate header
if (openNext && isDuplicateHeader(line)) {
line = this.readNextLine();
}
return line;
}
private void close() throws IOException {
if (this.reader != null) {
this.reader.close();
}
}
}
}