blob: 08994de3b25720af73b6d79ac5b661e040c53deb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.client;
import java.util.concurrent.SynchronousQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
public class TestPubSubClient extends PubSubServerStandAloneTestBase {
// Client side variables
protected HedwigClient client;
protected Publisher publisher;
protected Subscriber subscriber;
// SynchronousQueues to verify async calls
private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
// Test implementation of Callback for async client actions.
class TestCallback implements Callback<Void> {
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
new Thread(new Runnable() {
@Override
public void run() {
if (logger.isDebugEnabled())
logger.debug("Operation finished!");
ConcurrencyUtils.put(queue, true);
}
}).start();
}
@Override
public void operationFailed(Object ctx, final PubSubException exception) {
new Thread(new Runnable() {
@Override
public void run() {
logger.error("Operation failed!", exception);
ConcurrencyUtils.put(queue, false);
}
}).start();
}
}
// Test implementation of subscriber's message handler.
class TestMessageHandler implements MessageHandler {
public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
Object context) {
new Thread(new Runnable() {
@Override
public void run() {
if (logger.isDebugEnabled())
logger.debug("Consume operation finished successfully!");
ConcurrencyUtils.put(consumeQueue, true);
}
}).start();
callback.operationFinished(context, null);
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
client = new HedwigClient(new ClientConfiguration());
publisher = client.getPublisher();
subscriber = client.getSubscriber();
}
@Override
@After
public void tearDown() throws Exception {
client.close();
super.tearDown();
}
@Test
public void testSyncPublish() throws Exception {
boolean publishSuccess = true;
try {
publisher.publish(ByteString.copyFromUtf8("mySyncTopic"), Message.newBuilder().setBody(
ByteString.copyFromUtf8("Hello Sync World!")).build());
} catch (Exception e) {
publishSuccess = false;
}
assertTrue(publishSuccess);
}
@Test
public void testAsyncPublish() throws Exception {
publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
assertTrue(queue.take());
}
@Test
public void testMultipleAsyncPublish() throws Exception {
ByteString topic1 = ByteString.copyFromUtf8("myFirstTopic");
ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");
publisher.asyncPublish(topic1, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello World!")).build(),
new TestCallback(), null);
assertTrue(queue.take());
publisher.asyncPublish(topic2, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello on new topic!"))
.build(), new TestCallback(), null);
assertTrue(queue.take());
publisher.asyncPublish(topic1, Message.newBuilder().setBody(
ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
assertTrue(queue.take());
}
@Test
public void testSyncSubscribe() throws Exception {
boolean subscribeSuccess = true;
try {
subscriber.subscribe(ByteString.copyFromUtf8("mySyncSubscribeTopic"), ByteString.copyFromUtf8("1"), CreateOrAttach.CREATE_OR_ATTACH);
} catch (Exception e) {
subscribeSuccess = false;
}
assertTrue(subscribeSuccess);
}
@Test
public void testAsyncSubscribe() throws Exception {
subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"),
CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
assertTrue(queue.take());
}
@Test
public void testSubscribeAndConsume() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myConsumeTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
assertTrue(queue.take());
// Start delivery for the subscriber
subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
// Now publish some messages for the topic to be consumed by the
// subscriber.
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #3")).build(),
new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #4")).build(),
new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #5")).build(),
new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
}
@Test
public void testAsyncSubscribeAndUnsubscribe() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myAsyncUnsubTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
assertTrue(queue.take());
subscriber.asyncUnsubscribe(topic, subscriberId, new TestCallback(), null);
assertTrue(queue.take());
}
@Test
public void testSyncUnsubscribeWithoutSubscription() throws Exception {
boolean unsubscribeSuccess = false;
try {
subscriber.unsubscribe(ByteString.copyFromUtf8("mySyncUnsubTopic"), ByteString.copyFromUtf8("1"));
} catch (ClientNotSubscribedException e) {
unsubscribeSuccess = true;
} catch (Exception ex) {
unsubscribeSuccess = false;
}
assertTrue(unsubscribeSuccess);
}
@Test
public void testAsyncSubscribeAndCloseSubscription() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myAsyncSubAndCloseSubTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
assertTrue(queue.take());
subscriber.closeSubscription(topic, subscriberId);
assertTrue(true);
}
}