blob: 8c7cbd2a99f59dd958721c80f53dd160dcb958de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.server.topics;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.protobuf.ByteString;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.CompositeException;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.util.Pair;
import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestZkTopicManager extends ZooKeeperTestBase {
static Logger LOG = LoggerFactory.getLogger(TestZkTopicManager.class);
protected ZkTopicManager tm;
protected class CallbackQueue<T> implements Callback<T> {
SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
public SynchronousQueue<Either<T, Exception>> getQueue() {
return q;
}
public Either<T, Exception> take() throws InterruptedException {
return q.take();
}
@Override
public void operationFailed(Object ctx, final PubSubException exception) {
LOG.error("got exception: " + exception);
new Thread(new Runnable() {
@Override
public void run() {
ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
}
}).start();
}
@Override
public void operationFinished(Object ctx, final T resultOfOperation) {
new Thread(new Runnable() {
@Override
public void run() {
ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
}
}).start();
}
}
protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
protected ByteString topic = ByteString.copyFromUtf8("topic");
protected ServerConfiguration cfg;
protected HedwigSocketAddress me;
protected ScheduledExecutorService scheduler;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
cfg = new ServerConfiguration();
me = cfg.getServerAddr();
scheduler = Executors.newSingleThreadScheduledExecutor();
tm = new ZkTopicManager(zk, cfg, scheduler);
}
@Test
public void testGetOwnerSingle() throws Exception {
tm.getOwner(topic, false, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
}
protected ByteString mkTopic(int i) {
return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
}
protected <T> T check(Either<T, Exception> ex) throws Exception {
if (ex.left() == null)
throw ex.right();
else
return ex.left();
}
public static class CustomServerConfiguration extends ServerConfiguration {
int port;
public CustomServerConfiguration(int port) {
this.port = port;
}
@Override
public int getServerPort() {
return port;
}
}
@Test
public void testGetOwnerMulti() throws Exception {
ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
cfg.getServerPort() + 2);
// TODO change cfg1 cfg2 params
ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
tm.getOwner(topic, false, addrCbq, null);
HedwigSocketAddress owner = check(addrCbq.take());
// If we were told to have another person claim the topic, make them
// claim the topic.
if (owner.getPort() == cfg1.getServerPort())
tm1.getOwner(topic, true, addrCbq, null);
else if (owner.getPort() == cfg2.getServerPort())
tm2.getOwner(topic, true, addrCbq, null);
if (owner.getPort() != cfg.getServerPort())
Assert.assertEquals(owner, check(addrCbq.take()));
for (int i = 0; i < 100; ++i) {
tm.getOwner(topic, false, addrCbq, null);
Assert.assertEquals(owner, check(addrCbq.take()));
tm1.getOwner(topic, false, addrCbq, null);
Assert.assertEquals(owner, check(addrCbq.take()));
tm2.getOwner(topic, false, addrCbq, null);
Assert.assertEquals(owner, check(addrCbq.take()));
}
// Give us 100 chances to choose another owner if not shouldClaim.
for (int i = 0; i < 100; ++i) {
if (!owner.equals(me))
break;
tm.getOwner(mkTopic(i), false, addrCbq, null);
owner = check(addrCbq.take());
if (i == 99)
Assert.fail("Never chose another owner");
}
// Make sure we always choose ourselves if shouldClaim.
for (int i = 0; i < 100; ++i) {
tm.getOwner(mkTopic(100), true, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
}
}
@Test
public void testLoadBalancing() throws Exception {
tm.getOwner(topic, false, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
new ZkTopicManager(zk, cfg1, scheduler);
ByteString topic1 = mkTopic(1);
tm.getOwner(topic1, false, addrCbq, null);
Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
}
class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
boolean failure;
SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) {
this.bsQueue = bsQueue;
}
public void setFailure(boolean failure) {
this.failure = failure;
}
@Override
public void lostTopic(final ByteString topic) {
new Thread(new Runnable() {
@Override
public void run() {
ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
}
}).start();
}
public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) {
new Thread(new Runnable() {
@Override
public void run() {
ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
if (failure) {
callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
} else {
callback.operationFinished(ctx, null);
}
}
}).start();
}
}
@Test
public void testOwnershipChange() throws Exception {
SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>();
StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
tm.addTopicOwnershipChangeListener(listener);
// regular acquire
tm.getOwner(topic, true, addrCbq, null);
Pair<ByteString, Boolean> pair = bsQueue.take();
Assert.assertEquals(topic, pair.first());
Assert.assertTrue(pair.second());
Assert.assertEquals(me, check(addrCbq.take()));
assertOwnershipNodeExists();
// topic that I already own
tm.getOwner(topic, true, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
Assert.assertTrue(bsQueue.isEmpty());
assertOwnershipNodeExists();
// regular release
tm.releaseTopic(topic, cb, null);
pair = bsQueue.take();
Assert.assertEquals(topic, pair.first());
Assert.assertFalse(pair.second());
Assert.assertTrue(queue.take());
assertOwnershipNodeDoesntExist();
// releasing topic that I don't own
tm.releaseTopic(mkTopic(0), cb, null);
Assert.assertTrue(queue.take());
Assert.assertTrue(bsQueue.isEmpty());
// set listener to return error
listener.setFailure(true);
tm.getOwner(topic, true, addrCbq, null);
pair = bsQueue.take();
Assert.assertEquals(topic, pair.first());
Assert.assertTrue(pair.second());
Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
.getExceptions().iterator().next().getClass());
Assert.assertFalse(tm.topics.contains(topic));
Thread.sleep(100);
assertOwnershipNodeDoesntExist();
}
public void assertOwnershipNodeExists() throws Exception {
byte[] data = zk.getData(tm.hubPath(topic), false, null);
Assert.assertEquals(new HedwigSocketAddress(new String(data)), tm.addr);
}
public void assertOwnershipNodeDoesntExist() throws Exception {
try {
zk.getData(tm.hubPath(topic), false, null);
Assert.assertTrue(false);
} catch (KeeperException e) {
Assert.assertEquals(e.code(), KeeperException.Code.NONODE);
}
}
@Test
public void testZKClientDisconnected() throws Exception {
// First assert ownership of the topic
tm.getOwner(topic, true, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
// Suspend the ZKTopicManager and make sure calls to getOwner error out
tm.isSuspended = true;
tm.getOwner(topic, true, addrCbq, null);
Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
// Release the topic. This should not error out even if suspended.
tm.releaseTopic(topic, cb, null);
Assert.assertTrue(queue.take());
assertOwnershipNodeDoesntExist();
// Restart the ZKTopicManager and make sure calls to getOwner are okay
tm.isSuspended = false;
tm.getOwner(topic, true, addrCbq, null);
Assert.assertEquals(me, check(addrCbq.take()));
assertOwnershipNodeExists();
}
}