| package org.apache.helix.messaging; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import com.google.common.collect.ImmutableList; |
| import org.apache.helix.Criteria; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.HelixProperty; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.MockAccessor; |
| import org.apache.helix.NotificationContext; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.SystemPropertyKeys; |
| import org.apache.helix.messaging.handling.HelixTaskResult; |
| import org.apache.helix.messaging.handling.MessageHandler; |
| import org.apache.helix.messaging.handling.MessageHandlerFactory; |
| import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory; |
| import org.apache.helix.messaging.handling.TaskExecutor; |
| import org.apache.helix.mock.MockManager; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.LiveInstance.LiveInstanceProperty; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.tools.DefaultIdealStateCalculator; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.testng.Assert; |
| import org.testng.AssertJUnit; |
| import org.testng.annotations.Test; |
| |
| public class TestDefaultMessagingService { |
| class MockHelixManager extends MockManager { |
| class MockDataAccessor extends MockAccessor { |
| |
| @Override |
| public <T extends HelixProperty> T getProperty(PropertyKey key) { |
| |
| PropertyType type = key.getType(); |
| if (type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES) { |
| return (T) new ExternalView(_externalView); |
| } |
| return null; |
| } |
| |
| @Override |
| public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, |
| boolean throwException) { |
| PropertyType type = key.getType(); |
| List<T> result = new ArrayList<T>(); |
| Class<? extends HelixProperty> clazz = key.getTypeClass(); |
| if (type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES) { |
| HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView); |
| result.add((T) typedInstance); |
| return result; |
| } else if (type == PropertyType.LIVEINSTANCES) { |
| return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances); |
| } |
| |
| return result; |
| } |
| } |
| |
| HelixDataAccessor _accessor = new MockDataAccessor(); |
| ZNRecord _externalView; |
| List<String> _instances; |
| List<ZNRecord> _liveInstances; |
| String _db = "DB"; |
| int _replicas = 3; |
| int _partitions = 50; |
| |
| public MockHelixManager() { |
| _liveInstances = new ArrayList<ZNRecord>(); |
| _instances = new ArrayList<String>(); |
| for (int i = 0; i < 5; i++) { |
| String instance = "localhost_" + (12918 + i); |
| _instances.add(instance); |
| ZNRecord metaData = new ZNRecord(instance); |
| metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID() |
| .toString()); |
| _liveInstances.add(metaData); |
| } |
| _externalView = |
| DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas, _db, |
| "MASTER", "SLAVE"); |
| |
| } |
| |
| @Override |
| public boolean isConnected() { |
| return true; |
| } |
| |
| @Override |
| public HelixDataAccessor getHelixDataAccessor() { |
| return _accessor; |
| } |
| |
| @Override |
| public String getInstanceName() { |
| return "localhost_12919"; |
| } |
| |
| @Override |
| public InstanceType getInstanceType() { |
| return InstanceType.PARTICIPANT; |
| } |
| } |
| |
| class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory { |
| class TestMessageHandler extends MessageHandler { |
| |
| public TestMessageHandler(Message message, NotificationContext context) { |
| super(message, context); |
| // TODO Auto-generated constructor stub |
| } |
| |
| @Override |
| public HelixTaskResult handleMessage() throws InterruptedException { |
| HelixTaskResult result = new HelixTaskResult(); |
| result.setSuccess(true); |
| return result; |
| } |
| |
| @Override |
| public void onError(Exception e, ErrorCode code, ErrorType type) { |
| // TODO Auto-generated method stub |
| |
| } |
| } |
| |
| @Override |
| public MessageHandler createHandler(Message message, NotificationContext context) { |
| // TODO Auto-generated method stub |
| return new TestMessageHandler(message, context); |
| } |
| |
| @Override public List<String> getMessageTypes() { |
| return ImmutableList.of("TestingMessageHandler"); |
| } |
| |
| @Override |
| public void reset() { |
| // TODO Auto-generated method stub |
| |
| } |
| } |
| |
| class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory { |
| |
| @Override |
| public MessageHandler createHandler(Message message, NotificationContext context) { |
| return null; |
| } |
| |
| @Override |
| public List<String> getMessageTypes() { |
| return ImmutableList.of(Message.MessageType.STATE_TRANSITION.name(), |
| Message.MessageType.STATE_TRANSITION_CANCELLATION.name(), |
| Message.MessageType.CONTROLLER_MSG.name()); |
| } |
| |
| @Override |
| public void reset() { |
| |
| } |
| } |
| |
| class MockDefaultMessagingService extends DefaultMessagingService { |
| public MockDefaultMessagingService(HelixManager manager) { |
| super(manager); |
| } |
| |
| public Map<String, MessageHandlerFactory> getMessageHandlerFactoryMap() { |
| return _messageHandlerFactoriestobeAdded; |
| } |
| } |
| |
| @Test() |
| public void TestMessageSend() { |
| HelixManager manager = new MockHelixManager(); |
| DefaultMessagingService svc = new DefaultMessagingService(manager); |
| TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); |
| svc.registerMessageHandlerFactory(factory.getMessageType(), factory); |
| |
| Criteria recipientCriteria = new Criteria(); |
| recipientCriteria.setInstanceName("localhost_12919"); |
| recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); |
| recipientCriteria.setSelfExcluded(true); |
| |
| Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString()); |
| AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template)); |
| |
| recipientCriteria.setSelfExcluded(false); |
| AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template)); |
| |
| // all instances, all partitions |
| recipientCriteria.setSelfExcluded(false); |
| recipientCriteria.setInstanceName("%"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template)); |
| |
| // all instances, all partitions, use * instead of % |
| recipientCriteria.setSelfExcluded(false); |
| recipientCriteria.setInstanceName("*"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("*"); |
| AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template)); |
| |
| // tail pattern |
| recipientCriteria.setSelfExcluded(false); |
| recipientCriteria.setInstanceName("localhost%"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template)); |
| |
| // exclude this instance, send to all others for all partitions |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("%"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template)); |
| |
| // single instance, all partitions |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("localhost_12920"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template)); |
| |
| // single character wildcards |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("l_calhost_12_20"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template)); |
| |
| // head pattern |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("%12920"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template)); |
| |
| // middle pattern |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("l%_12920"); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template)); |
| |
| // send to a controller |
| recipientCriteria.setSelfExcluded(true); |
| recipientCriteria.setInstanceName("localhost_12920"); |
| recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER); |
| recipientCriteria.setResource("DB"); |
| recipientCriteria.setPartition("%"); |
| AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template)); |
| } |
| |
| @Test public void testMultipleMessageTypeRegisteration() { |
| HelixManager manager = new MockManager(); |
| MockDefaultMessagingService svc = new MockDefaultMessagingService(manager); |
| TestStateTransitionHandlerFactory factory = new TestStateTransitionHandlerFactory(); |
| svc.registerMessageHandlerFactory(factory.getMessageTypes(), factory); |
| |
| Assert.assertTrue( |
| svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.STATE_TRANSITION.name())); |
| Assert.assertTrue(svc.getMessageHandlerFactoryMap() |
| .containsKey(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())); |
| Assert.assertTrue( |
| svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name())); |
| } |
| |
| @Test |
| public void testTaskThreadpoolResetTimeoutProperty() { |
| HelixManager manager = new MockManager(); |
| System.setProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT, "300"); |
| MockDefaultMessagingService svc = new MockDefaultMessagingService(manager); |
| Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), 300); |
| System.clearProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT); |
| svc = new MockDefaultMessagingService(new MockManager()); |
| Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS); |
| } |
| } |