blob: 7edf954f0ed962384823caaa933918a8c34643ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata.input;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.skife.jdbi.v2.DBI;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SqlInputSourceTest
{
private static final List<File> FIREHOSE_TMP_DIRS = new ArrayList<>();
private final String TABLE_NAME_1 = "FOOS_TABLE_1";
private final String TABLE_NAME_2 = "FOOS_TABLE_2";
private final List<String> SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1");
private final List<String> SQLLIST2 = ImmutableList.of(
"SELECT timestamp,a,b FROM FOOS_TABLE_1",
"SELECT timestamp,a,b FROM FOOS_TABLE_2"
);
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")),
new ArrayList<>(),
new ArrayList<>()
),
ColumnsFilter.all()
);
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = TestHelper.makeSmileMapper();
private TestDerbyConnector derbyConnector;
@Before
public void setUp()
{
for (Module jacksonModule : new InputSourceModule().getJacksonModules()) {
mapper.registerModule(jacksonModule);
}
}
@AfterClass
public static void teardown() throws IOException
{
for (File dir : FIREHOSE_TMP_DIRS) {
FileUtils.forceDelete(dir);
}
}
private void assertResult(List<Row> rows, List<String> sqls)
{
Assert.assertEquals(10 * sqls.size(), rows.size());
rows.sort(Comparator.comparing(Row::getTimestamp)
.thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0)))
.thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0))));
int rowCount = 0;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < sqls.size(); j++) {
final Row row = rows.get(rowCount);
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
Assert.assertEquals(timestampSt, row.getTimestamp().toString());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue());
rowCount++;
}
}
}
private File createFirehoseTmpDir(String dirSuffix) throws IOException
{
final File firehoseTempDir = File.createTempFile(
SqlInputSourceTest.class.getSimpleName(),
dirSuffix
);
FileUtils.forceDelete(firehoseTempDir);
FileUtils.forceMkdir(firehoseTempDir);
FIREHOSE_TMP_DIRS.add(firehoseTempDir);
return firehoseTempDir;
}
@Test
public void testSerde() throws IOException
{
mapper.registerSubtypes(TestSerdeFirehoseConnector.class);
final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector(
new MetadataStorageConnectorConfig());
final SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testSerdeFirehoseConnector, mapper);
final String valueString = mapper.writeValueAsString(sqlInputSource);
final SqlInputSource inputSourceFromJson = mapper.readValue(valueString, SqlInputSource.class);
Assert.assertEquals(sqlInputSource, inputSourceFromJson);
}
@Test
public void testSingleSplit() throws Exception
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
final File tempDir = createFirehoseTmpDir("testSingleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper);
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRow> resultIterator = sqlReader.read();
final List<Row> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
rows.add(resultIterator.next());
}
assertResult(rows, SQLLIST1);
testUtils.dropTable(TABLE_NAME_1);
}
@Test
public void testMultipleSplits() throws Exception
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
testUtils.createAndUpdateTable(TABLE_NAME_2, 10);
final File tempDir = createFirehoseTmpDir("testMultipleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper);
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRow> resultIterator = sqlReader.read();
final List<Row> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
rows.add(resultIterator.next());
}
assertResult(rows, SQLLIST2);
testUtils.dropTable(TABLE_NAME_1);
testUtils.dropTable(TABLE_NAME_2);
}
@Test
public void testNumSplits()
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper);
InputFormat inputFormat = EasyMock.createMock(InputFormat.class);
Stream<InputSplit<String>> sqlSplits = sqlInputSource.createSplits(inputFormat, null);
Assert.assertEquals(SQLLIST2, sqlSplits.map(InputSplit::get).collect(Collectors.toList()));
Assert.assertEquals(2, sqlInputSource.estimateNumSplits(inputFormat, null));
}
@Test
public void testSample() throws Exception
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
try {
final File tempDir = createFirehoseTmpDir("testSingleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper);
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRowListPlusRawValues> resultIterator = sqlReader.sample();
final List<InputRowListPlusRawValues> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
InputRowListPlusRawValues row = resultIterator.next();
Assert.assertNull(row.getParseException());
rows.add(row);
}
assertResult(rows.stream().flatMap(r -> r.getInputRows().stream()).collect(Collectors.toList()), SQLLIST1);
}
finally {
testUtils.dropTable(TABLE_NAME_1);
}
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(SqlInputSource.class)
.withPrefabValues(
ObjectMapper.class,
new ObjectMapper(),
new ObjectMapper()
)
.withIgnoredFields("objectMapper")
.withNonnullFields("sqls", "sqlFirehoseDatabaseConnector")
.usingGetClass()
.verify();
}
@JsonTypeName("test")
private static class TestSerdeFirehoseConnector extends SQLFirehoseDatabaseConnector
{
private final DBI dbi;
private final MetadataStorageConnectorConfig metadataStorageConnectorConfig;
private TestSerdeFirehoseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig
)
{
final BasicDataSource datasource = getDatasource(
metadataStorageConnectorConfig,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("user", "create");
}
}
);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
this.dbi = new DBI(datasource);
this.metadataStorageConnectorConfig = metadataStorageConnectorConfig;
}
@JsonProperty("connectorConfig")
public MetadataStorageConnectorConfig getConnectorConfig()
{
return metadataStorageConnectorConfig;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestSerdeFirehoseConnector that = (TestSerdeFirehoseConnector) o;
return metadataStorageConnectorConfig.equals(that.metadataStorageConnectorConfig);
}
@Override
public int hashCode()
{
return Objects.hash(metadataStorageConnectorConfig);
}
@Override
public DBI getDBI()
{
return dbi;
}
@Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
{
return ImmutableSet.of("user", "create");
}
}
}