blob: d24a86dfae1a8db4d8335b1a0ddde5135f514c7e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.server.handlers;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import com.google.protobuf.ByteString;
import org.apache.hedwig.StubCallback;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.delivery.ChannelEndPoint;
import org.apache.hedwig.server.delivery.StubDeliveryManager;
import org.apache.hedwig.server.delivery.StubDeliveryManager.StartServingRequest;
import org.apache.hedwig.server.netty.WriteRecordingChannel;
import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.subscriptions.StubSubscriptionManager;
import org.apache.hedwig.server.subscriptions.TrueFilter;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
import org.apache.hedwig.util.ConcurrencyUtils;
import junit.framework.TestCase;
public class TestSubUnsubHandler extends TestCase {
SubscribeHandler sh;
StubDeliveryManager dm;
StubSubscriptionManager sm;
ByteString topic = ByteString.copyFromUtf8("topic");
WriteRecordingChannel channel;
SubscribeRequest subRequestPrototype;
PubSubRequest pubSubRequestPrototype;
ByteString subscriberId;
UnsubscribeHandler ush;
@Override
protected void setUp() throws Exception {
super.setUp();
ServerConfiguration conf = new ServerConfiguration();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
dm = new StubDeliveryManager();
PersistenceManager pm = LocalDBPersistenceManager.instance();
sm = new StubSubscriptionManager(tm, pm, conf, executor);
sh = new SubscribeHandler(tm, dm, pm, sm, conf);
channel = new WriteRecordingChannel();
subscriberId = ByteString.copyFromUtf8("subId");
subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
ush = new UnsubscribeHandler(tm, conf, sm, dm);
}
@Test
public void testNoSubscribeRequest() {
sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
channel);
assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
.getStatusCode());
}
@Test
public void testSuccessCase() {
StubCallback<Void> callback = new StubCallback<Void>();
sm.acquiredTopic(topic, callback, null);
assertNull(ConcurrencyUtils.take(callback.queue).right());
sh.handleRequestAtOwner(pubSubRequestPrototype, channel);
assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
// make sure the channel was put in the maps
assertEquals(new TopicSubscriber(topic, subscriberId), sh.channel2sub.get(channel));
assertEquals(channel, sh.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
// make sure delivery was started
StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
assertEquals(channel, ((ChannelEndPoint) startRequest.endPoint).getChannel());
assertEquals(false, startRequest.isHubSubscriber);
assertEquals(TrueFilter.class, startRequest.filter.getClass());
assertEquals(1, startRequest.seqIdToStartFrom.getLocalComponent());
assertEquals(subscriberId, startRequest.subscriberId);
assertEquals(topic, startRequest.topic);
// make sure subscription was registered
StubCallback<MessageSeqId> callback1 = new StubCallback<MessageSeqId>();
sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
null);
assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
.right().getClass());
// trying to subscribe again should throw an error
WriteRecordingChannel dupChannel = new WriteRecordingChannel();
sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
// after disconnecting the channel, subscribe should work again
sh.channelDisconnected(channel);
dupChannel = new WriteRecordingChannel();
sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
assertEquals(StatusCode.SUCCESS, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
// test unsubscribe
channel = new WriteRecordingChannel();
ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
.getStatusCode());
PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
channel = new WriteRecordingChannel();
dm.lastRequest.clear();
ush.handleRequestAtOwner(unsubRequest, channel);
assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
// make sure delivery has been stopped
assertEquals(new TopicSubscriber(topic, subscriberId), dm.lastRequest.poll());
// make sure the info is gone from the sm
StubCallback<MessageSeqId> callback2 = new StubCallback<MessageSeqId>();
sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
null);
assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
.getClass());
}
}