| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package storm.kafka; |
| |
| import backtype.storm.Config; |
| import org.apache.curator.test.TestingServer; |
| import kafka.javaapi.consumer.SimpleConsumer; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| |
| import java.util.*; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyInt; |
| import static org.mockito.Mockito.when; |
| |
| public class ZkCoordinatorTest { |
| |
| |
| @Mock |
| private DynamicBrokersReader reader; |
| |
| @Mock |
| private DynamicPartitionConnections dynamicPartitionConnections; |
| |
| private KafkaTestBroker broker = new KafkaTestBroker(); |
| private TestingServer server; |
| private Map stormConf = new HashMap(); |
| private SpoutConfig spoutConfig; |
| private ZkState state; |
| private SimpleConsumer simpleConsumer; |
| |
| @Before |
| public void setUp() throws Exception { |
| MockitoAnnotations.initMocks(this); |
| server = new TestingServer(); |
| String connectionString = server.getConnectString(); |
| ZkHosts hosts = new ZkHosts(connectionString); |
| hosts.refreshFreqSecs = 1; |
| spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id"); |
| Map conf = buildZookeeperConfig(server); |
| state = new ZkState(conf); |
| simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); |
| when(dynamicPartitionConnections.register(any(Broker.class), anyInt())).thenReturn(simpleConsumer); |
| } |
| |
| private Map buildZookeeperConfig(TestingServer server) { |
| Map conf = new HashMap(); |
| conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort()); |
| conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); |
| conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); |
| conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); |
| conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); |
| return conf; |
| } |
| |
| @After |
| public void shutdown() throws Exception { |
| simpleConsumer.close(); |
| broker.shutdown(); |
| server.close(); |
| } |
| |
| @Test |
| public void testOnePartitionPerTask() throws Exception { |
| int totalTasks = 64; |
| int partitionsPerTask = 1; |
| List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); |
| when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks)); |
| for (ZkCoordinator coordinator : coordinatorList) { |
| List<PartitionManager> myManagedPartitions = coordinator.getMyManagedPartitions(); |
| assertEquals(partitionsPerTask, myManagedPartitions.size()); |
| assertEquals(coordinator._taskIndex, myManagedPartitions.get(0).getPartition().partition); |
| } |
| } |
| |
| |
| @Test |
| public void testPartitionsChange() throws Exception { |
| final int totalTasks = 64; |
| int partitionsPerTask = 2; |
| List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); |
| when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9092)); |
| List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); |
| waitForRefresh(); |
| when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfo(totalTasks, 9093)); |
| List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); |
| assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); |
| Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator(); |
| for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { |
| List<PartitionManager> partitionManagersAfter = iterator.next(); |
| assertPartitionsAreDifferent(partitionManagersBefore, partitionManagersAfter, partitionsPerTask); |
| } |
| } |
| |
| private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) { |
| assertEquals(partitionsPerTask, partitionManagersBefore.size()); |
| assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size()); |
| for (int i = 0; i < partitionsPerTask; i++) { |
| assertNotEquals(partitionManagersBefore.get(i).getPartition(), partitionManagersAfter.get(i).getPartition()); |
| } |
| |
| } |
| |
| private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) { |
| List<List<PartitionManager>> partitions = new ArrayList(); |
| for (ZkCoordinator coordinator : coordinatorList) { |
| partitions.add(coordinator.getMyManagedPartitions()); |
| } |
| return partitions; |
| } |
| |
| private void waitForRefresh() throws InterruptedException { |
| Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1); |
| } |
| |
| private List<ZkCoordinator> buildCoordinators(int totalTasks) { |
| List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>(); |
| for (int i = 0; i < totalTasks; i++) { |
| ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); |
| coordinatorList.add(coordinator); |
| } |
| return coordinatorList; |
| } |
| |
| |
| } |