blob: a85159115fe3f0a2520102b3fd5b82cd4200c314 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.zookeeper;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class ZooKeeperClientAspectJTest {
private ZookeeperServerTest localZkS;
private ZooKeeper localZkc;
private final long ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 2000;
private final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
static {
// load agent with aspectjweaver-Agent for testing
// maven-test waves advice on build-goal so, maven doesn't need explicit loading
// uncomment it while testing on eclipse:
// AgentLoader.loadAgentClass(Agent.class.getName(), null);
}
@Test
public void testZkConnected() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
try {
ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
assertTrue(localZkc.getState().isConnected());
assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
}finally{
if (localZkc != null) {
localZkc.close();
}
executor.shutdown();
}
}
@Test
public void testInitZk() throws Exception {
try {
ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
CompletableFuture<ZooKeeper> zkFuture = zkfactory.create("127.0.0.1:" + localZkS.getZookeeperPort(),
SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
assertTrue(localZkc.getState().isConnected());
assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
String connection = "127.0.0.1:" + localZkS.getZookeeperPort() + "/prefix";
ZooKeeper chrootZkc = PulsarClusterMetadataSetup.initZk(connection, (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
assertTrue(chrootZkc.getState().isConnected());
assertNotEquals(chrootZkc.getState(), States.CONNECTEDREADONLY);
chrootZkc.close();
assertNotNull(localZkc.exists("/prefix", false));
} finally {
if (localZkc != null) {
localZkc.close();
}
}
}
@BeforeMethod
void setup() throws Exception {
localZkS = new ZookeeperServerTest(0);
localZkS.start();
}
@AfterMethod(alwaysRun = true)
void teardown() throws Exception {
localZkS.close();
}
/**
* Verifies that aspect-advice calculates the latency of of zk-operation
*
* @throws Exception
*/
@Test(enabled = false, timeOut = 7000)
void testZkClientAspectJTrigger() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
try {
assertTrue(localZkc.getState().isConnected());
assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
final AtomicInteger writeCount = new AtomicInteger(0);
final AtomicInteger readCount = new AtomicInteger(0);
EventListner listener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMiliSecond) {
if (eventType.equals(EventType.write)) {
writeCount.incrementAndGet();
} else if (eventType.equals(EventType.read)) {
readCount.incrementAndGet();
}
}
};
ClientCnxnAspect.addListener(listener);
CountDownLatch createLatch = new CountDownLatch(1);
CountDownLatch deleteLatch = new CountDownLatch(1);
CountDownLatch readLatch = new CountDownLatch(1);
CountDownLatch existLatch = new CountDownLatch(1);
localZkc.create("/createTest", "data".getBytes(), Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
createLatch.countDown();
}, "create");
localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> {
deleteLatch.countDown();
}, "delete");
localZkc.exists("/createTest", null, (int rc, String path, Object ctx, Stat stat) -> {
existLatch.countDown();
}, null);
localZkc.getData("/createTest", null, (int rc, String path, Object ctx, byte data[], Stat stat) -> {
readLatch.countDown();
}, null);
createLatch.await();
deleteLatch.await();
existLatch.await();
readLatch.await();
Thread.sleep(500);
Assert.assertEquals(writeCount.get(), 2);
Assert.assertEquals(readCount.get(), 2);
ClientCnxnAspect.removeListener(listener);
} finally {
if (localZkc != null) {
localZkc.close();
}
executor.shutdown();
}
}
/**
* Verifies that aspect-advice calculates the latency of of zk-operation and updates PulsarStats
*
* @throws Exception
*/
@Test(enabled = false, timeOut = 7000)
public void testZkOpStatsMetrics() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
MockPulsar mockPulsar = new MockPulsar(localZkc);
mockPulsar.setup();
try {
PulsarClient pulsarClient = mockPulsar.getClient();
PulsarService pulsar = mockPulsar.getPulsar();
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Metrics zkOpMetric = getMetric(pulsar, "zk_write_latency");
Assert.assertNotNull(zkOpMetric);
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_rate_s"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_95percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_99_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_9_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_mean_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_median_ms"));
zkOpMetric = getMetric(pulsar, "zk_read_latency");
Assert.assertNotNull(zkOpMetric);
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_rate_s"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_95percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_99_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_9_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_percentile_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_mean_ms"));
Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_median_ms"));
CountDownLatch createLatch = new CountDownLatch(1);
CountDownLatch deleteLatch = new CountDownLatch(1);
CountDownLatch readLatch = new CountDownLatch(1);
CountDownLatch existLatch = new CountDownLatch(1);
localZkc.create("/createTest", "data".getBytes(), Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
createLatch.countDown();
}, "create");
localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> {
deleteLatch.countDown();
}, "delete");
localZkc.exists("/createTest", null, (int rc, String path, Object ctx, Stat stat) -> {
existLatch.countDown();
}, null);
localZkc.getData("/createTest", null, (int rc, String path, Object ctx, byte data[], Stat stat) -> {
readLatch.countDown();
}, null);
createLatch.await();
deleteLatch.await();
existLatch.await();
readLatch.await();
Thread.sleep(10);
BrokerService brokerService = pulsar.getBrokerService();
brokerService.updateRates();
List<Metrics> metrics = brokerService.getTopicMetrics();
AtomicDouble writeRate = new AtomicDouble();
AtomicDouble readRate = new AtomicDouble();
metrics.forEach(m -> {
if ("zk_write_latency".equalsIgnoreCase(m.getDimension("metric"))) {
writeRate.set((double) m.getMetrics().get("brk_zk_write_latency_rate_s"));
} else if ("zk_read_latency".equalsIgnoreCase(m.getDimension("metric"))) {
readRate.set((double) m.getMetrics().get("brk_zk_read_latency_rate_s"));
}
});
Assert.assertTrue(readRate.get() > 0);
Assert.assertTrue(writeRate.get() > 0);
} finally {
mockPulsar.cleanup();
if (localZkc != null) {
localZkc.close();
}
executor.shutdown();
}
}
private Metrics getMetric(PulsarService pulsar, String dimension) {
BrokerService brokerService = pulsar.getBrokerService();
brokerService.updateRates();
for (Metrics metric : brokerService.getTopicMetrics()) {
if (dimension.equalsIgnoreCase(metric.getDimension("metric"))) {
return metric;
}
}
return null;
}
static class ZookeeperServerTest implements Closeable {
private final File zkTmpDir;
private ZooKeeperServer zks;
private NIOServerCnxnFactory serverFactory;
private final int zkPort;
private final String hostPort;
public ZookeeperServerTest(int zkPort) throws IOException {
this.zkPort = zkPort;
this.hostPort = "127.0.0.1:" + zkPort;
this.zkTmpDir = File.createTempFile("zookeeper", "test");
log.info("**** Start GZK on {} ****", zkTmpDir);
if (!zkTmpDir.delete() || !zkTmpDir.mkdir()) {
throw new IOException("Couldn't create zk directory " + zkTmpDir);
}
}
public void start() throws IOException {
try {
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
zks.setMaxSessionTimeout(20000);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(zkPort), 1000);
serverFactory.startup(zks);
} catch (Exception e) {
log.error("Exception while instantiating ZooKeeper", e);
}
LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000);
log.info("ZooKeeper started at {}", hostPort);
}
public void stop() throws IOException {
zks.shutdown();
serverFactory.shutdown();
log.info("Stoppend ZK server at {}", hostPort);
}
@Override
public void close() throws IOException {
zks.shutdown();
serverFactory.shutdown();
zkTmpDir.delete();
}
public int getZookeeperPort() {
return serverFactory.getLocalPort();
}
private final Logger log = LoggerFactory.getLogger(ZookeeperServerTest.class);
}
class MockPulsar extends BrokerTestBase {
private final ZooKeeper zk;
public MockPulsar(ZooKeeper zk) {
this.zk = zk;
}
@Override
protected void setup() throws Exception {
super.baseSetup();
doReturn(new ZooKeeperClientFactory() {
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
return CompletableFuture.completedFuture(zk);
}
}).when(pulsar).getZooKeeperClientFactory();
}
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public PulsarService getPulsar() {
return pulsar;
}
public PulsarClient getClient() {
return pulsarClient;
}
}
}