blob: 590e0e3d5cdc1bd7e0d0ba97349efe6d85eb87a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.firehose;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@SuppressWarnings("ConstantConditions")
public class InlineFirehoseTest
{
private static final String DIMENSION_0 = "timestamp";
private static final String DIMENSION_1 = "value";
private static final List<String> DIMENSIONS = Arrays.asList(DIMENSION_0, DIMENSION_1);
private static final String DELIMITER = ",";
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final StringInputRowParser PARSER = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
DIMENSION_0,
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(DIMENSIONS),
Collections.emptyList(),
Collections.emptyList()
),
DELIMITER,
DIMENSIONS,
false,
0
),
CHARSET.name()
);
private static final String EMPTY = "";
private static final String TIMESTAMP_0 = "0";
private static final String VALUE_0 = "a";
private static final String NOT_EMPTY = TIMESTAMP_0 + DELIMITER + VALUE_0;
private static final String PARSEABLE = NOT_EMPTY;
private static final String NOT_PARSEABLE = VALUE_0 + DELIMITER + TIMESTAMP_0;
private static final String TIMESTAMP_1 = "1";
private static final String VALUE_1 = "b";
private static final String LINE_0 = TIMESTAMP_0 + DELIMITER + VALUE_0;
private static final String LINE_1 = TIMESTAMP_1 + DELIMITER + VALUE_1;
private static final String MULTILINE = LINE_0 + "\n" + LINE_1;
@Test
public void testHasMoreEmpty()
{
InlineFirehose target = create(EMPTY);
Assert.assertFalse(target.hasMore());
}
@Test
public void testHasMoreNotEmpty()
{
InlineFirehose target = create(NOT_EMPTY);
Assert.assertTrue(target.hasMore());
}
@Test(expected = NoSuchElementException.class)
public void testNextRowEmpty()
{
InlineFirehose target = create(EMPTY);
target.nextRow();
}
@Test
public void testNextRowNotEmpty()
{
InlineFirehose target = create(NOT_EMPTY);
InputRow row = target.nextRow();
assertRowValue(VALUE_0, row);
}
@Test(expected = NoSuchElementException.class)
public void testNextRowWithRawEmpty()
{
InlineFirehose target = create(EMPTY);
target.nextRowWithRaw();
}
@Test
public void testNextRowWithRawParseable()
{
final String data = PARSEABLE;
InlineFirehose target = create(data);
InputRowListPlusRawValues rowPlusRaw = target.nextRowWithRaw();
InputRow row = Iterables.getOnlyElement(rowPlusRaw.getInputRows());
assertRowValue(VALUE_0, row);
Map<String, Object> raw = rowPlusRaw.getRawValues();
Map<String, Object> expected = new HashMap<>();
expected.put("timestamp", TIMESTAMP_0);
expected.put("value", VALUE_0);
Assert.assertEquals(expected, raw);
Assert.assertNull(rowPlusRaw.getParseException());
}
@Test
public void testNextRowWithRawNotParseable()
{
final String data = NOT_PARSEABLE;
InlineFirehose target = create(data);
InputRowListPlusRawValues rowPlusRaw = target.nextRowWithRaw();
Assert.assertNull(rowPlusRaw.getInputRows());
Map<String, Object> raw = rowPlusRaw.getRawValues();
Map<String, Object> expected = new HashMap<>();
expected.put("timestamp", VALUE_0);
expected.put("value", TIMESTAMP_0);
Assert.assertEquals(expected, raw);
Assert.assertNotNull(rowPlusRaw.getParseException());
}
@Test
public void testCloseOpen() throws IOException
{
InlineFirehose target = create(NOT_EMPTY);
target.close();
try {
target.nextRow();
Assert.fail("Should not be able to read from closed firehose");
}
catch (NoSuchElementException ignored) {
}
}
@Test
public void testCloseNotOpen()
{
InlineFirehose target = create(NOT_EMPTY);
try {
target.close();
}
catch (IOException e) {
Assert.fail("Should be able to close an opened firehose");
}
try {
target.close();
}
catch (IOException e) {
Assert.fail("Should be able to close a closed firehose");
}
}
@Test
public void testMultiline()
{
InlineFirehose target = create(MULTILINE);
// First line
Assert.assertTrue(target.hasMore());
InputRow row0 = target.nextRow();
assertRowValue(VALUE_0, row0);
// Second line
InputRowListPlusRawValues rowPlusRaw = target.nextRowWithRaw();
assertRowValue(VALUE_1, Iterables.getOnlyElement(rowPlusRaw.getInputRows()));
Map<String, Object> raw = rowPlusRaw.getRawValues();
Map<String, Object> expected = new HashMap<>();
expected.put("timestamp", TIMESTAMP_1);
expected.put("value", VALUE_1);
Assert.assertEquals(expected, raw);
Assert.assertNull(rowPlusRaw.getParseException());
Assert.assertFalse(target.hasMore());
}
private static InlineFirehose create(String data)
{
return new InlineFirehose(data, PARSER);
}
private static void assertRowValue(String expected, InputRow row)
{
Assert.assertNotNull(row);
List<String> values = row.getDimension(DIMENSION_1);
Assert.assertNotNull(values);
Assert.assertEquals(1, values.size());
Assert.assertEquals(expected, values.get(0));
}
}