blob: 277daeef6fb9de99fa9bdd46125b600caa0522c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.utils;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
public class KafkaUtilsTest {
public static class ZkMockedUtils {
@Mock
CuratorFramework client;
@Mock
GetChildrenBuilder childrenBuilder;
@Mock
GetDataBuilder dataBuilder;
/**
* {
* "host": "192.168.1.148",
* "port": 9092
* }
*/
@Multiline
public static String brokerWithHostPort;
@Test
public void testGetEndpointsFromZookeeperHostPort() throws Exception {
ArrayList<String> brokerIds = new ArrayList<>();
brokerIds.add("1");
when(client.getChildren()).thenReturn(childrenBuilder);
when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
when(client.getData()).thenReturn(dataBuilder);
when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithHostPort.getBytes(
StandardCharsets.UTF_8));
ArrayList<String> expected = new ArrayList<>();
expected.add("192.168.1.148:9092");
assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
}
/**
* {
* "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093", "SASL_PLAINTEXT://host1:9094", "PLAINTEXTSASL://host1:9095"]
* }
*/
@Multiline
public static String brokerWithEndpoints;
@Test
public void testGetEndpointsFromZookeeperEndpoints() throws Exception {
ArrayList<String> brokerIds = new ArrayList<>();
brokerIds.add("1");
when(client.getChildren()).thenReturn(childrenBuilder);
when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
when(client.getData()).thenReturn(dataBuilder);
when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithEndpoints.getBytes(
StandardCharsets.UTF_8));
ArrayList<String> expected = new ArrayList<>();
expected.add("host1:9092");
expected.add("host1:9093");
expected.add("host1:9094");
expected.add("host1:9095");
assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
}
/**
* {
* "host": "192.168.1.148",
* "port": 9092,
* "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093"]
* }
*/
@Multiline
public static String brokerWithHostPortAndEndpoints;
@Test
public void testGetEndpointsFromZookeeperHostPortAndEndpoints() throws Exception {
ArrayList<String> brokerIds = new ArrayList<>();
brokerIds.add("1");
when(client.getChildren()).thenReturn(childrenBuilder);
when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
when(client.getData()).thenReturn(dataBuilder);
when(dataBuilder.forPath("/brokers/ids/1"))
.thenReturn(brokerWithHostPortAndEndpoints.getBytes(StandardCharsets.UTF_8));
ArrayList<String> expected = new ArrayList<>();
expected.add("192.168.1.148:9092");
assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
}
}
public static List<Object[]> data() {
String[] hostnames = new String[]{"node1", "localhost", "192.168.0.1", "my.domain.com"};
String[] schemes = new String[]{"SSL", "PLAINTEXTSASL", "PLAINTEXT", "SASL_PLAINTEXT"};
String[] ports = new String[]{"6667", "9091", null};
List<Object[]> ret = new ArrayList<>();
for (String scheme : schemes) {
for (String hostname : hostnames) {
for (String port : ports) {
port = port != null ? (":" + port) : "";
String expected = hostname + port;
ret.add(new Object[]{scheme + "://" + expected, expected});
}
}
}
return ret;
}
@ParameterizedTest
@MethodSource("data")
public void testEndpointParsing(String endpoint, String expected) {
assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
}
}