| /** |
| * Copyright 2016 Yahoo Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.yahoo.pulsar.broker.service; |
| |
| import static com.yahoo.pulsar.broker.ServiceConfigurationLoader.create; |
| import static org.testng.Assert.assertEquals; |
| |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.bookkeeper.test.PortManager; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.yahoo.pulsar.broker.PulsarService; |
| import com.yahoo.pulsar.broker.ServiceConfiguration; |
| import com.yahoo.pulsar.client.admin.PulsarAdmin; |
| import com.yahoo.pulsar.client.api.Authentication; |
| import com.yahoo.pulsar.client.api.ClientConfiguration; |
| import com.yahoo.pulsar.client.api.Consumer; |
| import com.yahoo.pulsar.client.api.Message; |
| import com.yahoo.pulsar.client.api.MessageBuilder; |
| import com.yahoo.pulsar.client.api.MessageId; |
| import com.yahoo.pulsar.client.api.Producer; |
| import com.yahoo.pulsar.client.api.ProducerConfiguration; |
| import com.yahoo.pulsar.client.api.PulsarClient; |
| import com.yahoo.pulsar.client.util.FutureUtil; |
| import com.yahoo.pulsar.common.naming.DestinationName; |
| import com.yahoo.pulsar.common.policies.data.ClusterData; |
| import com.yahoo.pulsar.common.policies.data.PropertyAdmin; |
| import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import com.yahoo.pulsar.zookeeper.ZookeeperServerTest; |
| |
| public class ReplicatorTestBase { |
| URL url1; |
| PulsarService pulsar1; |
| BrokerService ns1; |
| |
| PulsarAdmin admin1; |
| LocalBookkeeperEnsemble bkEnsemble1; |
| |
| URL url2; |
| ServiceConfiguration config2; |
| PulsarService pulsar2; |
| BrokerService ns2; |
| PulsarAdmin admin2; |
| LocalBookkeeperEnsemble bkEnsemble2; |
| |
| URL url3; |
| ServiceConfiguration config3; |
| PulsarService pulsar3; |
| BrokerService ns3; |
| PulsarAdmin admin3; |
| LocalBookkeeperEnsemble bkEnsemble3; |
| |
| ZookeeperServerTest globalZkS; |
| |
| ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); |
| |
| // Default frequency |
| public int getBrokerServicePurgeInactiveFrequency() { |
| return 60; |
| } |
| |
| public boolean isBrokerServicePurgeInactiveDestination() { |
| return false; |
| } |
| |
| void setup() throws Exception { |
| log.info("--- Starting ReplicatorTestBase::setup ---"); |
| int globalZKPort = PortManager.nextFreePort(); |
| globalZkS = new ZookeeperServerTest(globalZKPort); |
| globalZkS.start(); |
| |
| // Start region 1 |
| int zkPort1 = PortManager.nextFreePort(); |
| bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort()); |
| bkEnsemble1.start(); |
| |
| int webServicePort1 = PortManager.nextFreePort(); |
| |
| // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have |
| // completely |
| // independent config objects instead of referring to the same properties object |
| ServiceConfiguration config1 = create(new Properties(System.getProperties())); |
| config1.setClusterName("r1"); |
| config1.setWebServicePort(webServicePort1); |
| config1.setZookeeperServers("127.0.0.1:" + zkPort1); |
| config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); |
| config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveDestination()); |
| config1.setBrokerServicePurgeInactiveFrequencyInSeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config1.setBrokerServicePort(PortManager.nextFreePort()); |
| pulsar1 = new PulsarService(config1); |
| pulsar1.start(); |
| ns1 = pulsar1.getBrokerService(); |
| |
| url1 = new URL("http://127.0.0.1:" + webServicePort1); |
| admin1 = new PulsarAdmin(url1, (Authentication) null); |
| |
| // Start region 2 |
| |
| // Start zk & bks |
| int zkPort2 = PortManager.nextFreePort(); |
| bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort()); |
| bkEnsemble2.start(); |
| |
| int webServicePort2 = PortManager.nextFreePort(); |
| config2 = create(new Properties(System.getProperties())); |
| config2.setClusterName("r2"); |
| config2.setWebServicePort(webServicePort2); |
| config2.setZookeeperServers("127.0.0.1:" + zkPort2); |
| config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); |
| config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveDestination()); |
| config2.setBrokerServicePurgeInactiveFrequencyInSeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config2.setBrokerServicePort(PortManager.nextFreePort()); |
| pulsar2 = new PulsarService(config2); |
| pulsar2.start(); |
| ns2 = pulsar2.getBrokerService(); |
| |
| url2 = new URL("http://127.0.0.1:" + webServicePort2); |
| admin2 = new PulsarAdmin(url2, (Authentication) null); |
| |
| // Start region 3 |
| |
| // Start zk & bks |
| int zkPort3 = PortManager.nextFreePort(); |
| bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort()); |
| bkEnsemble3.start(); |
| |
| int webServicePort3 = PortManager.nextFreePort(); |
| config3 = create(new Properties(System.getProperties())); |
| config3.setClusterName("r3"); |
| config3.setWebServicePort(webServicePort3); |
| config3.setZookeeperServers("127.0.0.1:" + zkPort3); |
| config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); |
| config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveDestination()); |
| config3.setBrokerServicePurgeInactiveFrequencyInSeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config3.setBrokerServicePort(PortManager.nextFreePort()); |
| pulsar3 = new PulsarService(config3); |
| pulsar3.start(); |
| ns3 = pulsar3.getBrokerService(); |
| |
| url3 = new URL("http://127.0.0.1:" + webServicePort3); |
| admin3 = new PulsarAdmin(url3, (Authentication) null); |
| |
| // Provision the global namespace |
| admin1.clusters().createCluster("r1", new ClusterData(url1.toString())); |
| admin1.clusters().createCluster("r2", new ClusterData(url2.toString())); |
| admin1.clusters().createCluster("r3", new ClusterData(url3.toString())); |
| |
| admin1.clusters().createCluster("global", new ClusterData("http://global:8080")); |
| admin1.properties().createProperty("pulsar", |
| new PropertyAdmin(Lists.newArrayList("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3"))); |
| admin1.namespaces().createNamespace("pulsar/global/ns"); |
| admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Lists.newArrayList("r1", "r2", "r3")); |
| |
| assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString()); |
| assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString()); |
| assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString()); |
| admin1.namespaces().createNamespace("pulsar/global/ns1"); |
| admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Lists.newArrayList("r1", "r2")); |
| /* |
| * assertEquals(admin2.clusters().getCluster("global").getServiceUrl(), "http://global:8080"); |
| * assertEquals(admin2.properties().getPropertyAdmin("pulsar").getAdminRoles(), Lists.newArrayList("appid1", |
| * "appid2")); assertEquals(admin2.namespaces().getPolicies("pulsar/global/ns").replication_clusters, |
| * Lists.newArrayList("r1", "r2", "r3")); |
| * |
| * admin1.namespaces().createNamespace("pulsar/global/ns2"); |
| * admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns2", Lists.newArrayList("r1", "r2", |
| * "r3")); |
| */ |
| Thread.sleep(100); |
| log.info("--- ReplicatorTestBase::setup completed ---"); |
| |
| } |
| |
| private int inSec(int time, TimeUnit unit) { |
| return (int) TimeUnit.SECONDS.convert(time, unit); |
| } |
| |
| void shutdown() throws Exception { |
| log.info("--- Shutting down ---"); |
| executor.shutdown(); |
| |
| admin1.close(); |
| admin2.close(); |
| admin3.close(); |
| |
| pulsar3.close(); |
| ns3.close(); |
| |
| pulsar2.close(); |
| ns2.close(); |
| |
| pulsar1.close(); |
| ns1.close(); |
| |
| bkEnsemble1.stop(); |
| bkEnsemble2.stop(); |
| bkEnsemble3.stop(); |
| globalZkS.stop(); |
| } |
| |
| static class MessageProducer { |
| URL url; |
| String namespace; |
| String topicName; |
| PulsarClient client; |
| Producer producer; |
| |
| MessageProducer(URL url, final DestinationName dest) throws Exception { |
| this.url = url; |
| this.namespace = dest.getNamespace(); |
| this.topicName = dest.toString(); |
| ClientConfiguration conf = new ClientConfiguration(); |
| conf.setStatsInterval(0, TimeUnit.SECONDS); |
| client = PulsarClient.create(url.toString(), conf); |
| producer = client.createProducer(topicName); |
| |
| } |
| |
| MessageProducer(URL url, final DestinationName dest, boolean batch) throws Exception { |
| this.url = url; |
| this.namespace = dest.getNamespace(); |
| this.topicName = dest.toString(); |
| ClientConfiguration conf = new ClientConfiguration(); |
| conf.setStatsInterval(0, TimeUnit.SECONDS); |
| client = PulsarClient.create(url.toString(), conf); |
| ProducerConfiguration producerConfiguration = new ProducerConfiguration(); |
| if (batch) { |
| producerConfiguration.setBatchingEnabled(true); |
| producerConfiguration.setBatchingMaxMessages(5); |
| } |
| producer = client.createProducer(topicName); |
| |
| } |
| |
| void produceBatch(int messages) throws Exception { |
| log.info("Start sending batch messages"); |
| List<CompletableFuture<MessageId>> futureList = new ArrayList<>(); |
| |
| for (int i = 0; i < messages; i++) { |
| futureList.add(producer.sendAsync(("test-" + i).getBytes())); |
| log.info("queued message {}", ("test-" + i)); |
| } |
| FutureUtil.waitForAll(futureList).get(); |
| } |
| |
| void produce(int messages) throws Exception { |
| |
| log.info("Start sending messages"); |
| for (int i = 0; i < messages; i++) { |
| producer.send(("test-" + i).getBytes()); |
| log.info("Sent message {}", ("test-" + i)); |
| } |
| |
| } |
| |
| void produce(int messages, MessageBuilder messageBuilder) throws Exception { |
| log.info("Start sending messages"); |
| for (int i = 0; i < messages; i++) { |
| final String m = new String("test-builder-" + i); |
| messageBuilder.setContent(m.getBytes()); |
| producer.send(messageBuilder.build()); |
| log.info("Sent message {}", m); |
| } |
| } |
| |
| void close() throws Exception { |
| client.close(); |
| } |
| |
| } |
| |
| static class MessageConsumer { |
| final URL url; |
| final String namespace; |
| final String topicName; |
| final PulsarClient client; |
| final Consumer consumer; |
| |
| MessageConsumer(URL url, final DestinationName dest) throws Exception { |
| this(url, dest, "sub-id"); |
| } |
| |
| MessageConsumer(URL url, final DestinationName dest, String subId) throws Exception { |
| this.url = url; |
| this.namespace = dest.getNamespace(); |
| this.topicName = dest.toString(); |
| ClientConfiguration conf = new ClientConfiguration(); |
| conf.setStatsInterval(0, TimeUnit.SECONDS); |
| client = PulsarClient.create(url.toString(), conf); |
| |
| try { |
| consumer = client.subscribe(topicName, subId); |
| } catch (Exception e) { |
| client.close(); |
| throw e; |
| } |
| } |
| |
| void receive(int messages) throws Exception { |
| log.info("Start receiving messages"); |
| Message msg = null; |
| |
| for (int i = 0; i < messages; i++) { |
| msg = consumer.receive(); |
| consumer.acknowledge(msg); |
| String msgData = new String(msg.getData()); |
| assertEquals(msgData, "test-" + i); |
| log.info("Received message {}", msgData); |
| } |
| } |
| |
| boolean drained() throws Exception { |
| return consumer.receive(0, TimeUnit.MICROSECONDS) == null; |
| } |
| |
| void close() throws Exception { |
| client.close(); |
| } |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(ReplicatorTestBase.class); |
| } |