blob: 7b239908a9b801394d4e0016e9c4d7a575b0f3b9 [file] [log] [blame]
package org.apache.helix.messaging.handling;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.ImmutableList;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.mock.MockManager;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestHelixTaskExecutor {
@BeforeClass
public void beforeClass() {
System.out.println("START " + TestHelper.getTestClassName());
}
@AfterClass
public void afterClass() {
System.out.println("End " + TestHelper.getTestClassName());
}
public static class MockClusterManager extends MockManager {
@Override
public String getSessionId() {
return "123";
}
}
class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
final int _messageDelay;
int _handlersCreated = 0;
ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
ConcurrentSkipListSet<String> _completedMsgIds = new ConcurrentSkipListSet<>();
TestMessageHandlerFactory(int messageDelay) {
_messageDelay = messageDelay;
}
TestMessageHandlerFactory() {
_messageDelay = 100;
}
class TestMessageHandler extends MessageHandler {
public TestMessageHandler(Message message, NotificationContext context) {
super(message, context);
}
@Override
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
Thread.sleep(_messageDelay);
result.setSuccess(true);
_completedMsgIds.add(_message.getMsgId());
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
}
}
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
throw new HelixException("Test Message handler exception, can ignore");
}
_handlersCreated++;
return new TestMessageHandler(message, context);
}
@Override
public List<String> getMessageTypes() {
return Collections.singletonList("TestingMessageHandler");
}
@Override
public void reset() {
}
}
class TestMessageHandlerFactory2 extends TestMessageHandlerFactory {
@Override
public List<String> getMessageTypes() {
return ImmutableList.of("TestingMessageHandler2");
}
}
class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {
int _handlersCreated = 0;
ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
class CancellableHandler extends MessageHandler {
public CancellableHandler(Message message, NotificationContext context) {
super(message, context);
}
public boolean _interrupted = false;
@Override
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
int sleepTimes = 15;
if (_message.getRecord().getSimpleFields().containsKey("Cancelcount")) {
sleepTimes = 10;
}
_processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
try {
for (int i = 0; i < sleepTimes; i++) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
_interrupted = true;
_timedOutMsgIds.put(_message.getMsgId(), "");
result.setInterrupted(true);
if (!_message.getRecord().getSimpleFields().containsKey("Cancelcount")) {
_message.getRecord().setSimpleField("Cancelcount", "1");
} else {
int c = Integer.parseInt(_message.getRecord().getSimpleField("Cancelcount"));
_message.getRecord().setSimpleField("Cancelcount", "" + (c + 1));
}
throw e;
}
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
result.setSuccess(true);
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
_message.getRecord().setSimpleField("exception", e.getMessage());
}
}
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
_handlersCreated++;
return new CancellableHandler(message, context);
}
@Override public List<String> getMessageTypes() {
return ImmutableList.of("Cancellable");
}
@Override
public void reset() {
_handlersCreated = 0;
_processedMsgIds.clear();
_processingMsgIds.clear();
_timedOutMsgIds.clear();
}
}
class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory {
ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
private final String _msgType;
private final long _delay;
public TestStateTransitionHandlerFactory(String msgType) {
this(msgType, -1);
}
public TestStateTransitionHandlerFactory(String msgType, long delay) {
_msgType = msgType;
_delay = delay;
}
class TestStateTransitionMessageHandler extends HelixStateTransitionHandler {
public TestStateTransitionMessageHandler(Message message, NotificationContext context,
CurrentState currentStateDelta) {
super(new StateModelFactory<StateModel>() {
// Empty no-op state model factory is good enough for the test.
}, new StateModel() {
// Empty no-op state model is good enough for the test.
}, message, context, currentStateDelta);
}
@Override
public HelixTaskResult handleMessage() {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
if (_delay > 0) {
System.out.println("Sleeping..." + _delay);
try {
Thread.sleep(_delay);
} catch (Exception e) {
assert (false);
}
}
result.setSuccess(true);
return result;
}
@Override
public StaleMessageValidateResult staleMessageValidator() {
return super.staleMessageValidator();
}
}
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
CurrentState currentStateDelta = new CurrentState(message.getResourceName());
currentStateDelta.setSessionId(message.getTgtSessionId());
currentStateDelta.setStateModelDefRef(message.getStateModelDef());
currentStateDelta.setStateModelFactoryName(message.getStateModelFactoryName());
currentStateDelta.setBucketSize(message.getBucketSize());
if (!message.getResourceName().equals("testStaledMessageResource")) {
// set the current state same as from state in the message in test testStaledMessage.
currentStateDelta.setState(message.getPartitionName(), "SLAVE");
} else {
// set the current state same as to state in the message in test testStaledMessage.
currentStateDelta.setState(message.getPartitionName(), "MASTER");
}
return new TestStateTransitionMessageHandler(message, context, currentStateDelta);
}
@Override
public List<String> getMessageTypes() {
return ImmutableList.of(_msgType);
}
@Override
public void reset() {
}
}
@Test()
public void testNormalMsgExecution() throws InterruptedException {
System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
for (String type : factory2.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory2);
}
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
msgList.add(msg);
}
int nMsgs2 = 6;
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(1000);
AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
for (Message record : msgList) {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId())
|| factory2._processedMsgIds.containsKey(record.getId()));
AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId())
&& factory2._processedMsgIds.containsKey(record.getId()));
}
System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
}
@Test()
public void testDuplicatedMessage() throws InterruptedException {
System.out.println("START TestHelixTaskExecutor.testDuplicatedMessage()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
TestStateTransitionHandlerFactory stateTransitionFactory =
new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
stateTransitionFactory);
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs = 3;
String instanceName = manager.getInstanceName();
for (int i = 0; i < nMsgs; i++) {
Message msg =
new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setCreateTimeStamp((long) i);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setPartitionName("Partition");
msg.setResourceName("Resource");
msg.setStateModelDef("DummyMasterSlave");
msg.setFromState("SLAVE");
msg.setToState("MASTER");
dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
msgList.add(msg);
}
AssertJUnit
.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
nMsgs);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(instanceName, msgList, changeContext);
Thread.sleep(200);
// only 1 message is left over - state transition takes 1sec
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
1);
// While a state transition message is going on, another state transition message for same
// resource / partition comes in, it should be discarded by message handler
// Mock accessor is modifying message state in memory so we set it back to NEW
msgList.get(2).setMsgState(MessageState.NEW);
dataAccessor.setProperty(msgList.get(2).getKey(keyBuilder, instanceName), msgList.get(2));
executor.onMessage(instanceName, Arrays.asList(msgList.get(2)), changeContext);
Thread.sleep(200);
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
1);
Thread.sleep(1000);
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
0);
System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
}
@Test()
public void testStaledMessage() throws InterruptedException {
System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
TestStateTransitionHandlerFactory stateTransitionFactory =
new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000);
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
stateTransitionFactory);
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs = 1;
String instanceName = manager.getInstanceName();
for (int i = 0; i < nMsgs; i++) {
Message msg =
new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setCreateTimeStamp((long) i);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setPartitionName("Partition");
msg.setResourceName("testStaledMessageResource");
msg.setStateModelDef("DummyMasterSlave");
msg.setFromState("SLAVE");
msg.setToState("MASTER");
dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
msgList.add(msg);
}
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
nMsgs);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(instanceName, msgList, changeContext);
Thread.sleep(200);
// The message should be ignored since toState is the same as current state.
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(),
0);
System.out.println("END TestHelixTaskExecutor.testStaledMessage()");
}
@Test()
public void testUnknownTypeMsgExecution() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(1000);
AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
AssertJUnit.assertTrue(factory2._handlersCreated == 0);
for (Message message : msgList) {
if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testMsgSessionId() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
for (String type : factory2.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory2);
}
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("");
msgList.add(msg);
}
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("some other session id");
msg.setTgtName("");
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(1000);
AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
AssertJUnit.assertTrue(factory2._handlersCreated == 0);
for (Message message : msgList) {
if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testCreateHandlerException() throws Exception {
System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
NotificationContext changeContext = new NotificationContext(manager);
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
// Sending message without registering the factory.
// The message won't be processed since creating handler returns null.
int nMsgs1 = 5;
List<Message> msgList = new ArrayList<>();
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getMsgId()), msg);
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(manager.getInstanceName(), Collections.emptyList(), changeContext);
for (Message message : msgList) {
message = dataAccessor
.getProperty(keyBuilder.message(manager.getInstanceName(), message.getMsgId()));
Assert.assertNotNull(message);
Assert.assertEquals(message.getMsgState(), MessageState.NEW);
Assert.assertEquals(message.getRetryCount(), 0);
}
// Test with a factory that throws Exception on certain message. The invalid message will be
// remain UNPROCESSABLE due to the Exception.
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
Message exceptionMsg =
new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
exceptionMsg.setTgtSessionId(manager.getSessionId());
exceptionMsg.setMsgSubType("EXCEPTION");
exceptionMsg.setTgtName("Localhost_1123");
exceptionMsg.setSrcName("127.101.1.23_2234");
exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(), exceptionMsg.getMsgId()),
exceptionMsg);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(manager.getInstanceName(), Collections.emptyList(), changeContext);
Assert.assertTrue(TestHelper.verify(() -> {
Message tmpExceptionMsg = dataAccessor
.getProperty(keyBuilder.message(manager.getInstanceName(), exceptionMsg.getMsgId()));
if (tmpExceptionMsg == null || !tmpExceptionMsg.getMsgState()
.equals(MessageState.UNPROCESSABLE) || tmpExceptionMsg.getRetryCount() != -1) {
return false;
}
return true;
}, TestHelper.WAIT_DURATION),
"The exception message should be retied once and in UNPROCESSABLE state.");
Assert.assertTrue(TestHelper.verify(() -> {
for (Message message : msgList) {
message = dataAccessor
.getProperty(keyBuilder.message(manager.getInstanceName(), message.getMsgId()));
if (message != null) {
return false;
}
}
return true;
}, TestHelper.WAIT_DURATION), "The normal messages should be all processed normally.");
Assert.assertEquals(factory._processedMsgIds.size(), nMsgs1);
Assert.assertEquals(factory._handlersCreated, nMsgs1);
System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
}
@Test()
public void testTaskCancellation() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 0;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
List<Message> msgListToCancel = new ArrayList<Message>();
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgListToCancel.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(500);
for (int i = 0; i < nMsgs2; i++) {
// executor.cancelTask(msgListToCancel.get(i), changeContext);
HelixTask task = new HelixTask(msgListToCancel.get(i), changeContext, null, null);
executor.cancelTask(task);
}
Thread.sleep(1500);
AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
for (Message message : msgList) {
if (factory.getMessageTypes().contains(message.getMsgType())) {
AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
}
}
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testShutdown() throws InterruptedException {
System.out.println("START TestCMTaskExecutor.testShutdown()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
for (String type : factory2.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory2);
}
CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
for (String type : factory3.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory3);
}
int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
List<Message> msgList = new ArrayList<Message>();
for (int i = 0; i < nMsg1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
for (int i = 0; i < nMsg2; i++) {
Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
for (int i = 0; i < nMsg3; i++) {
Message msg = new Message(factory3.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msgList.add(msg);
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msgList.add(msg);
}
NotificationContext changeContext = new NotificationContext(manager);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("some", msgList, changeContext);
Thread.sleep(500);
for (ExecutorService svc : executor._executorMap.values()) {
Assert.assertFalse(svc.isShutdown());
}
Assert.assertTrue(factory._processedMsgIds.size() > 0);
executor.shutdown();
for (ExecutorService svc : executor._executorMap.values()) {
Assert.assertTrue(svc.isShutdown());
}
System.out.println("END TestCMTaskExecutor.testShutdown()");
}
@Test(dependsOnMethods = "testShutdown")
public void testHandlerResetTimeout() throws Exception {
System.out.println("START TestCMTaskExecutor.testHandlerResetTimeout()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
int messageDelay = 2 * 1000; // 2 seconds
TestMessageHandlerFactory factory = new TestMessageHandlerFactory(messageDelay);
// Execute a message with short reset timeout
int shortTimeout = 100; // 100 ms
executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, shortTimeout);
final Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
NotificationContext changeContext = new NotificationContext(manager);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("some", Arrays.asList(msg), changeContext);
Assert.assertTrue(
TestHelper.verify(() -> factory._processedMsgIds.containsKey(msg.getMsgId()), TestHelper.WAIT_DURATION));
executor.shutdown();
for (ExecutorService svc : executor._executorMap.values()) {
Assert.assertTrue(svc.isShutdown());
}
Assert.assertEquals(factory._completedMsgIds.size(), 0);
// Execute a message with proper reset timeout, so it will wait enough time until the message is processed.
executor = new HelixTaskExecutor();
int longTimeout = messageDelay * 2; // 4 seconds
executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, longTimeout);
final Message msg2 = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg2.setTgtSessionId("*");
msg2.setTgtName("Localhost_1123");
msg2.setSrcName("127.101.1.23_2234");
executor.onMessage("some", Arrays.asList(msg2), changeContext);
Assert.assertTrue(
TestHelper.verify(() -> factory._processedMsgIds.containsKey(msg2.getMsgId()), TestHelper.WAIT_DURATION));
executor.shutdown();
for (ExecutorService svc : executor._executorMap.values()) {
Assert.assertTrue(svc.isShutdown());
}
Assert.assertEquals(factory._completedMsgIds.size(), 1);
Assert.assertTrue(factory._completedMsgIds.contains(msg2.getMsgId()));
System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
}
@Test()
public void testNoRetry() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
int nMsgs2 = 4;
// Test the case in which retry = 0
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setExecutionTimeout((i + 1) * 600);
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(4000);
AssertJUnit.assertTrue(factory._handlersCreated == nMsgs2);
AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2);
// AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
for (int i = 0; i < nMsgs2 - 2; i++) {
if (factory.getMessageTypes().contains(msgList.get(i).getMsgType())) {
AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields()
.containsKey("Cancelcount"));
AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
}
}
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testRetryOnce() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
CancellableHandlerFactory factory = new CancellableHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
// Test the case that the message are executed for the second time
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setExecutionTimeout((i + 1) * 600);
msg.setRetryCount(1);
msgList.add(msg);
}
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(3500);
AssertJUnit.assertEquals(factory._processedMsgIds.size(), 3);
AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2);
AssertJUnit.assertTrue(executor._taskMap.size() == 0);
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testStateTransitionCancellationMsg() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestStateTransitionHandlerFactory stateTransitionFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name());
TestStateTransitionHandlerFactory cancelFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION
.name());
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), stateTransitionFactory);
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION.name(), cancelFactory);
NotificationContext changeContext = new NotificationContext(manager);
List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
msg1.setTgtSessionId("*");
msg1.setPartitionName("P1");
msg1.setResourceName("R1");
msg1.setTgtName("Localhost_1123");
msg1.setSrcName("127.101.1.23_2234");
msg1.setFromState("SLAVE");
msg1.setToState("MASTER");
msgList.add(msg1);
Message msg2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString());
msg2.setTgtSessionId("*");
msg2.setPartitionName("P1");
msg2.setResourceName("R1");
msg2.setTgtName("Localhost_1123");
msg2.setSrcName("127.101.1.23_2234");
msg2.setFromState("SLAVE");
msg2.setToState("MASTER");
msgList.add(msg2);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("someInstance", msgList, changeContext);
Thread.sleep(3000);
AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testMessageReadOptimization() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<String> messageIds = new ArrayList<>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
accessor.setProperty(keyBuilder.message("someInstance", msg.getId()), msg);
messageIds.add(msg.getId());
}
NotificationContext changeContext = new NotificationContext(manager);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
// Simulate read message already, then processing message. Should read and handle no message.
executor._knownMessageIds.addAll(messageIds);
executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext);
Thread.sleep(3000);
AssertJUnit.assertEquals(0, factory._processedMsgIds.size());
executor._knownMessageIds.clear();
// Processing message normally
executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext);
Thread.sleep(3000);
AssertJUnit.assertEquals(nMsgs1, factory._processedMsgIds.size());
// After all messages are processed, _knownMessageIds should be empty.
Assert.assertTrue(executor._knownMessageIds.isEmpty());
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testNoWriteReadStateForRemovedMessage()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
executor.registerMessageHandlerFactory(type, factory);
}
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String instanceName = "someInstance";
List<String> messageIds = new ArrayList<>();
List<Message> messages = new ArrayList<>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
accessor.setProperty(keyBuilder.message(instanceName, msg.getId()), msg);
messageIds.add(msg.getId());
// Set for testing the update operation later
msg.setMsgState(MessageState.READ);
messages.add(msg);
}
Method updateMessageState = HelixTaskExecutor.class
.getDeclaredMethod("updateMessageState", Collection.class, HelixDataAccessor.class,
String.class);
updateMessageState.setAccessible(true);
updateMessageState.invoke(executor, messages, accessor, instanceName);
Assert.assertEquals(accessor.getChildNames(keyBuilder.messages(instanceName)).size(), nMsgs1);
accessor.removeProperty(keyBuilder.message(instanceName, messageIds.get(0)));
System.out.println(accessor.getChildNames(keyBuilder.messages(instanceName)).size());
for (Message message : messages) {
// Mock a change to ensure there will be some delta on the message node after update
message.setCorrelationId(UUID.randomUUID().toString());
}
updateMessageState.invoke(executor, messages, accessor, instanceName);
Assert
.assertEquals(accessor.getChildNames(keyBuilder.messages(instanceName)).size(), nMsgs1 - 1);
System.out.println("END " + TestHelper.getTestMethodName());
}
@Test(dependsOnMethods = "testStateTransitionCancellationMsg")
public void testStateTransitionMsgScheduleFailure() {
System.out.println("START " + TestHelper.getTestMethodName());
// Create a mock executor that fails the task scheduling.
HelixTaskExecutor executor = new HelixTaskExecutor() {
@Override
public boolean scheduleTask(MessageTask task) {
return false;
}
};
HelixManager manager = new MockClusterManager();
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
TestStateTransitionHandlerFactory stateTransitionFactory =
new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name());
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
stateTransitionFactory);
NotificationContext changeContext = new NotificationContext(manager);
Message msg = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
msg.setPartitionName("P1");
msg.setResourceName("R1");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setFromState("SLAVE");
msg.setToState("MASTER");
dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(), msg.getMsgId()), msg);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(manager.getInstanceName(), Collections.emptyList(), changeContext);
Assert.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
// Message should have been removed
Assert.assertNull(
dataAccessor.getProperty(keyBuilder.message(manager.getInstanceName(), msg.getMsgId())));
// Current state would be ERROR due to the failure of task scheduling.
CurrentState currentState = dataAccessor.getProperty(keyBuilder
.currentState(manager.getInstanceName(), manager.getSessionId(), msg.getResourceName()));
Assert.assertNotNull(currentState);
Assert.assertEquals(currentState.getState(msg.getPartitionName()),
HelixDefinedState.ERROR.toString());
System.out.println("END " + TestHelper.getTestMethodName());
}
}