blob: a1c64ad6b0fec1116904bf22118776e99961888d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.LineIterator;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.InputRow;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.Closeable;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class FileIteratingFirehoseTest extends NullHandlingTest
{
@Parameters(name = "{0}, {1}")
public static Collection<Object[]> constructorFeeder()
{
final List<List<String>> inputTexts = ImmutableList.of(
ImmutableList.of("2000,foo"),
ImmutableList.of("2000,foo\n2000,bar\n"),
ImmutableList.of("2000,foo\n2000,bar\n", "2000,baz"),
ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz"),
ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz", ""),
ImmutableList.of("2000,foo\n2000,bar\n2000,baz", "", "2000,baz", "2000,foo\n2000,bar\n3000,baz"),
ImmutableList.of(""),
ImmutableList.of()
);
final List<Object[]> args = new ArrayList<>();
for (int numSkipHeadRows = 0; numSkipHeadRows < 3; numSkipHeadRows++) {
for (List<String> texts : inputTexts) {
args.add(new Object[] {texts, numSkipHeadRows});
}
}
return args;
}
private static final char[] LINE_CHARS = "\n".toCharArray();
private final StringInputRowParser parser;
private final List<String> inputs;
private final List<String> expectedResults;
public FileIteratingFirehoseTest(List<String> texts, int numSkipHeaderRows)
{
parser = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null),
",",
ImmutableList.of("ts", "x"),
false,
numSkipHeaderRows
),
null
);
this.inputs = texts;
this.expectedResults = inputs.stream()
.map(input -> input.split("\n"))
.flatMap(lines -> {
final List<String> filteredLines = Arrays
.stream(lines)
.filter(line -> line.length() > 0)
.map(line -> line.split(",")[1])
.collect(Collectors.toList());
final int numRealSkippedRows = Math.min(filteredLines.size(), numSkipHeaderRows);
IntStream.range(0, numRealSkippedRows).forEach(i -> filteredLines.set(i, null));
return filteredLines.stream();
})
.collect(Collectors.toList());
}
@Test
public void testFirehose() throws Exception
{
final List<LineIterator> lineIterators = inputs.stream()
.map(s -> new LineIterator(new StringReader(s)))
.collect(Collectors.toList());
try (final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser)) {
final List<String> results = new ArrayList<>();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
results.add(null);
} else {
results.add(Joiner.on("|").join(inputRow.getDimension("x")));
}
}
Assert.assertEquals(expectedResults, results);
}
}
@Test(expected = RuntimeException.class)
public void testClose() throws IOException
{
final LineIterator lineIterator = new LineIterator(new Reader()
{
@Override
public int read(char[] cbuf, int off, int len)
{
System.arraycopy(LINE_CHARS, 0, cbuf, 0, LINE_CHARS.length);
return LINE_CHARS.length;
}
@Override
public void close()
{
throw new RuntimeException("close test for FileIteratingFirehose");
}
});
final TestCloseable closeable = new TestCloseable();
final FileIteratingFirehose firehose = new FileIteratingFirehose(
ImmutableList.of(lineIterator).iterator(),
parser,
closeable
);
firehose.hasMore(); // initialize lineIterator
firehose.close();
Assert.assertTrue(closeable.closed);
}
private static final class TestCloseable implements Closeable
{
private boolean closed;
@Override
public void close()
{
closed = true;
}
}
}