blob: 956d31cfd63543e1593839ef18adc761b2d2a752 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.server.topics;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.server.HedwigHubTestBase;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.junit.Test;
import com.google.protobuf.ByteString;
public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
// Client variables
protected HedwigClient client;
protected Publisher publisher;
protected Subscriber subscriber;
final LinkedBlockingQueue<ByteString> subscribers =
new LinkedBlockingQueue<ByteString>();
final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
final int numSubscribers = 300;
final AtomicInteger numDone = new AtomicInteger(0);
// SynchronousQueues to verify async calls
private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
class SubCallback implements Callback<Void> {
ByteString subId;
public SubCallback(ByteString subId) {
this.subId = subId;
}
@Override
public void operationFinished(Object ctx,
Void resultOfOperation) {
if (logger.isDebugEnabled()) {
logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
}
int done = numDone.incrementAndGet();
if (done == numSubscribers) {
ConcurrencyUtils.put(queue, false);
}
}
@Override
public void operationFailed(Object ctx,
PubSubException exception) {
if (logger.isDebugEnabled()) {
logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
}
ConcurrencyUtils.put(subscribers, subId);
// ConcurrencyUtils.put(queue, false);
}
}
@Override
public void setUp() throws Exception {
super.setUp();
client = new HedwigClient(new ClientConfiguration());
publisher = client.getPublisher();
subscriber = client.getSubscriber();
}
@Override
public void tearDown() throws Exception {
// sub.interrupt();
// sub.join();
client.close();
super.tearDown();
}
@Test
public void testTopicAcquistion() throws Exception {
logger.info("Start concurrent topic acquistion test.");
// let one bookie down to cause not enough bookie exception
logger.info("Tear down one bookie server.");
bktb.tearDownOneBookieServer();
// In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
// because the tear down bookie server's zk node need time to disappear
// some subscriptions will create ledger successfully, then other subscriptions will fail.
// the race condition will be: topic manager own topic but persistence manager doesn't
// 300 subscribers subscribe to a same topic
final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
numDone.set(0);
for (int i=0; i<numSubscribers; i++) {
ByteString subId = ByteString.copyFromUtf8("sub-" + i);
if (logger.isDebugEnabled()) {
logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
}
subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
new Callback<Void>() {
private void tick() {
if (numDone.incrementAndGet() == numSubscribers) {
ConcurrencyUtils.put(queue, true);
}
}
@Override
public void operationFinished(Object ctx,
Void resultOfOperation) {
tick();
}
@Override
public void operationFailed(Object ctx,
PubSubException exception) {
if (exception instanceof PubSubException.ServiceDownException) {
String msg = exception.getMessage();
if (msg.indexOf("ServerRedirectLoopException") > 0) {
inRedirectLoop.set(true);
}
if (logger.isDebugEnabled()) {
logger.debug("Operation failed : ", exception);
}
}
tick();
}
},
null);
}
queue.take();
// TODO: remove comment after we fix the issue
// Assert.assertEquals(false, inRedirectLoop.get());
// start a thread to send subscriptions
numDone.set(0);
Thread sub = new Thread(new Runnable() {
@Override
public void run() {
logger.info("sub thread started");
try {
// 100 subscribers subscribe to a same topic
for (int i=0; i<numSubscribers; i++) {
ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
subscribers.put(subscriberId);
}
ByteString subId;
while (true) {
subId = subscribers.take();
if (logger.isDebugEnabled()) {
logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
}
subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
new SubCallback(subId), null);
}
// subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
} catch (InterruptedException ie) {
// break
logger.warn("Interrupted : ", ie);
}
}
});
sub.start();
Thread.sleep(2000);
// start a new bookie server
logger.info("start new bookie server");
bktb.startUpNewBookieServer();
// hope that all the subscriptions will be OK
queue.take();
}
}