blob: 0284ee0e800d2fef9bc36f637c58e349ac8ec311 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests for the {@link FlinkKafkaConsumer08}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(Kafka08PartitionDiscoverer.class)
@PowerMockIgnore("javax.management.*")
@Ignore
public class KafkaConsumer08Test {
@Test
public void testValidateZooKeeperConfig() {
try {
// empty
Properties emptyProperties = new Properties();
try {
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
fail("should fail with an exception");
}
catch (IllegalArgumentException e) {
// expected
}
// no connect string (only group string)
Properties noConnect = new Properties();
noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
try {
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
fail("should fail with an exception");
}
catch (IllegalArgumentException e) {
// expected
}
// no group string (only connect string)
Properties noGroup = new Properties();
noGroup.put("zookeeper.connect", "localhost:47574");
try {
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
fail("should fail with an exception");
}
catch (IllegalArgumentException e) {
// expected
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testCreateSourceWithoutCluster() {
try {
Properties props = new Properties();
props.setProperty("zookeeper.connect", "localhost:56794");
props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
props.setProperty("group.id", "non-existent-group");
props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
Mockito.when(true).thenReturn(true);
consumer.setRuntimeContext(mockRuntimeContext);
consumer.open(new Configuration());
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
}
}
@Test
public void testAllBoostrapServerHostsAreInvalid() {
try {
String unknownHost = "foobar:11111";
URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
PowerMockito.mockStatic(InetAddress.class);
when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
String zookeeperConnect = "localhost:56794";
String groupId = "non-existent-group";
Properties props = createKafkaProps(zookeeperConnect, unknownHost, groupId);
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
Mockito.when(true).thenReturn(true);
consumer.setRuntimeContext(mockRuntimeContext);
consumer.open(new Configuration());
fail();
} catch (Exception expected) {
assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
expected.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ "' config are invalid"));
}
}
@Test
public void testAtLeastOneBootstrapServerHostIsValid() throws Exception {
try {
String zookeeperConnect = "localhost:56794";
String unknownHost = "foobar:11111";
// we declare one valid bootstrap server, namely the one with 'localhost'
String bootstrapServers = unknownHost + ", localhost:22222";
URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
PowerMockito.mockStatic(InetAddress.class);
when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
String groupId = "non-existent-group";
Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(
"no op topic",
new SimpleStringSchema(),
props);
consumer.open(new Configuration());
// no exception should be thrown, because we have one valid bootstrap server; test passes if we reach here
} catch (Exception e) {
assertFalse("No exception should be thrown containing 'all bootstrap servers invalid' message!",
e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ "' config are invalid"));
}
}
private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
Properties props = new Properties();
props.setProperty("zookeeper.connect", zookeeperConnect);
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
props.setProperty("socket.timeout.ms", "100");
props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
return props;
}
private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumer08<String> {
private static final long serialVersionUID = -3939402845009972810L;
public DummyFlinkKafkaConsumer(String topic, DeserializationSchema<String> schema, Properties props) {
super(Collections.singletonList(topic), schema, props);
}
@Override
public RuntimeContext getRuntimeContext() {
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
return mockRuntimeContext;
}
}
}