blob: 8801e053c583a29d420c5e75016e5963a73162c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.bulkload.pipeline;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat;
/**
* A {@link PipelineBlock}, which splits line according to CSV format rules and unquotes fields. The next block {@link
* PipelineBlock#accept(Object, boolean)} is called per-line.
*/
public class CsvLineProcessorBlock extends PipelineBlock<String, String[]> {
/** Empty string array. */
public static final String[] EMPTY_STR_ARRAY = new String[0];
/** Field delimiter pattern. */
private final char fldDelim;
/** Quote character. */
private final char quoteChars;
/** Null string. */
private final String nullString;
/** Trim field string content. */
private final boolean trim;
/** Lines count. */
private int line = 0;
/** Symbol count. */
private int symbol = 0;
/**
* Creates a CSV line parser.
*/
public CsvLineProcessorBlock(BulkLoadCsvFormat format) {
this.fldDelim = format.fieldSeparator().toString().charAt(0);
this.quoteChars = format.quoteChars().charAt(0);
this.nullString = format.nullString();
this.trim = format.trim();
}
/**
* {@inheritDoc}
*/
@Override public void accept(String input, boolean isLastPortion) throws IgniteCheckedException {
List<String> fields = new ArrayList<>();
StringBuilder currentField = new StringBuilder(256);
ReaderState state = ReaderState.IDLE;
final int length = input.length();
int copy = 0;
int current = 0;
int prev = -1;
int copyStart = 0;
boolean quotesMatched = true;
line++;
symbol = 0;
while (true) {
if (current == length) {
if (!quotesMatched)
throw new IgniteCheckedException(new SQLException("Unmatched quote found at the end of line "
+ line + ", symbol " + symbol));
if (copy > 0)
currentField.append(input, copyStart, copyStart + copy);
addField(fields, currentField, prev == quoteChars);
break;
}
final char c = input.charAt(current++);
symbol++;
if (state == ReaderState.QUOTED) {
if (c == quoteChars) {
state = ReaderState.IDLE;
quotesMatched = !quotesMatched;
if (copy > 0) {
currentField.append(input, copyStart, copyStart + copy);
copy = 0;
}
copyStart = current;
}
else
copy++;
}
else {
if (c == fldDelim) {
if (copy > 0) {
currentField.append(input, copyStart, copyStart + copy);
copy = 0;
}
addField(fields, currentField, prev == quoteChars);
currentField = new StringBuilder();
copyStart = current;
state = ReaderState.IDLE;
}
else if (c == quoteChars && state != ReaderState.UNQUOTED) {
state = ReaderState.QUOTED;
quotesMatched = !quotesMatched;
if (prev == quoteChars)
copy++;
else
copyStart = current;
}
else {
if (c == quoteChars) {
if (state == ReaderState.UNQUOTED)
throw new IgniteCheckedException(
new SQLException("Unexpected quote in the field, line " + line
+ ", symbol " + symbol));
quotesMatched = !quotesMatched;
}
copy++;
if (state == ReaderState.IDLE)
state = ReaderState.UNQUOTED;
}
}
prev = c;
}
nextBlock.accept(fields.toArray(EMPTY_STR_ARRAY), isLastPortion);
}
/**
*
* @param fields row fields.
* @param fieldVal field value.
*/
private void addField(List<String> fields, StringBuilder fieldVal, boolean quoted) {
final String val = trim ? fieldVal.toString().trim() : fieldVal.toString();
fields.add(val.equals(nullString) ? null : val);
}
/**
*
*/
private enum ReaderState {
/** */
IDLE,
/** */
UNQUOTED,
/** */
QUOTED
}
}