blob: 48d1fcb4c589db433f65122b178d9c52a056495c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Abstract test base for all format-specific Kafka table sources with builders.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public abstract class KafkaTableSourceBuilderTestBase {
private static final String[] FIELD_NAMES =
new String[]{"field1", "field2", "time1", "time2", "field3"};
private static final InternalType[] FIELD_TYPES =
new InternalType[]{DataTypes.LONG, DataTypes.STRING, DataTypes.TIMESTAMP, DataTypes.TIMESTAMP,
DataTypes.DOUBLE};
private static final String TOPIC = "testTopic";
private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
private static final Properties PROPS = createSourceProperties();
@Test
@SuppressWarnings("unchecked")
public void testKafkaConsumer() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
// assert that correct
KafkaTableSource observed = spy(b.build());
StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class));
observed.getDataStream(env);
verify(env).addSource(any(getFlinkKafkaConsumer()));
verify(observed).getKafkaConsumer(
eq(TOPIC),
eq(PROPS),
any(getDeserializationSchema()));
}
@Test
public void testTableSchema() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
KafkaTableSource source = b.build();
// check table schema
TableSchema schema = source.getTableSchema();
assertNotNull(schema);
assertEquals(5, schema.getColumnNames().length);
// check table fields
assertEquals("field1", schema.getColumnNames()[0]);
assertEquals("field2", schema.getColumnNames()[1]);
assertEquals("time1", schema.getColumnNames()[2]);
assertEquals("time2", schema.getColumnNames()[3]);
assertEquals("field3", schema.getColumnNames()[4]);
assertEquals(DataTypes.LONG, schema.getTypes()[0]);
assertEquals(DataTypes.STRING, schema.getTypes()[1]);
assertEquals(DataTypes.TIMESTAMP, schema.getTypes()[2]);
assertEquals(DataTypes.TIMESTAMP, schema.getTypes()[3]);
assertEquals(DataTypes.DOUBLE, schema.getTypes()[4]);
}
@Test
public void testNoTimeAttributes() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
KafkaTableSource source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert no rowtime
assertNotNull(source.getRowtimeAttributeDescriptors());
assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
}
@Test
public void testProctimeAttribute() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
b.withProctimeAttribute("time1");
KafkaTableSource source = b.build();
// assert correct proctime field
assertEquals(source.getProctimeAttribute(), "time1");
// assert no rowtime
assertNotNull(source.getRowtimeAttributeDescriptors());
assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
}
@Test
public void testRowtimeAttribute() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
KafkaTableSource source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert correct rowtime descriptor
List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
assertNotNull(descs);
assertEquals(1, descs.size());
RowtimeAttributeDescriptor desc = descs.get(0);
assertEquals("time2", desc.getAttributeName());
// assert timestamp extractor
assertTrue(desc.getTimestampExtractor() instanceof ExistingField);
assertEquals(1, desc.getTimestampExtractor().getArgumentFields().length);
assertEquals("time2", desc.getTimestampExtractor().getArgumentFields()[0]);
// assert watermark strategy
assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
}
@Test
public void testRowtimeAttribute2() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
try {
b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
KafkaTableSource source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert correct rowtime descriptor
List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
assertNotNull(descs);
assertEquals(1, descs.size());
RowtimeAttributeDescriptor desc = descs.get(0);
assertEquals("time2", desc.getAttributeName());
// assert timestamp extractor
assertTrue(desc.getTimestampExtractor() instanceof StreamRecordTimestamp);
assertTrue(desc.getTimestampExtractor().getArgumentFields().length == 0);
// assert watermark strategy
assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
} catch (Exception e) {
if (b.supportsKafkaTimestamps()) {
// builder should support Kafka timestamps
fail();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testConsumerOffsets() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);
// test the default behavior
KafkaTableSource source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
// test reading from earliest
b.fromEarliest();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromEarliest();
// test reading from latest
b.fromLatest();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromLatest();
// test reading from group offsets
b.fromGroupOffsets();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
// test reading from given offsets
b.fromSpecificOffsets(mock(Map.class));
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class));
}
protected abstract KafkaTableSource.Builder getBuilder();
protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
protected void configureBuilder(KafkaTableSource.Builder builder) {
builder
.forTopic(TOPIC)
.withKafkaProperties(PROPS)
.withSchema(SCHEMA);
}
private static Properties createSourceProperties() {
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "dummy");
properties.setProperty("group.id", "dummy");
return properties;
}
}