blob: 94de8331b13a2c83eb2f5c6403ff83a09377c09d [file] [log] [blame]
package org.apache.helix.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.UUID;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Message utils to operate on message such creating messages.
*/
public class MessageUtil {
private static final Logger LOG = LoggerFactory.getLogger(MessageUtil.class);
// TODO: Make the message retry count configurable through the Cluster Config or IdealStates.
public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
public static Message createStateTransitionCancellationMessage(String srcInstanceName,
String srcSessionId, Resource resource, String partitionName, String instanceName,
String sessionId, String stateModelDefName, String fromState, String toState,
String nextState, Message cancellationMessage, boolean isCancellationEnabled,
String currentState) {
if (isCancellationEnabled && cancellationMessage == null) {
LOG.info("Create cancellation message of the state transition for {}.{} on {}, "
+ "currentState: {}, nextState: {}, toState: {}", resource.getResourceName(),
partitionName, instanceName, currentState, nextState == null ? "N/A" : nextState,
toState);
Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, sessionId, stateModelDefName);
message.setFromState(fromState);
message.setToState(toState);
return message;
}
return null;
}
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
stateModelDefName);
// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}
return message;
}
/**
* Creates a message to change participant status
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
*
* @param currentState current status of the live instance
* @param nextState next status that will be changed to
* @param srcInstanceName source instance name
* @param srcSessionId session id for the source instance
* @param tgtInstanceName target instance name
* @param tgtSessionId target instance session id
* @return participant status change message
*/
public static Message createStatusChangeMessage(LiveInstance.LiveInstanceStatus currentState,
LiveInstance.LiveInstanceStatus nextState, String srcInstanceName, String srcSessionId,
String tgtInstanceName, String tgtSessionId) {
return createBasicMessage(Message.MessageType.PARTICIPANT_STATUS_CHANGE, srcInstanceName,
srcSessionId, tgtInstanceName, tgtSessionId, currentState.name(), nextState.name());
}
/* Creates a message that that has the least required fields. */
private static Message createBasicMessage(Message.MessageType messageType, String srcInstanceName,
String srcSessionId, String tgtInstanceName, String tgtSessionId, String currentState,
String nextState) {
String uuid = UUID.randomUUID().toString();
Message message = new Message(messageType, uuid);
message.setSrcName(srcInstanceName);
message.setTgtName(tgtInstanceName);
message.setMsgState(Message.MessageState.NEW);
message.setFromState(currentState);
message.setToState(nextState);
message.setTgtSessionId(tgtSessionId);
message.setSrcSessionId(srcSessionId);
message.setExpectedSessionId(srcSessionId);
return message;
}
/* Creates state transition or state transition cancellation message */
private static Message createStateTransitionMessage(Message.MessageType messageType,
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
String instanceName, String currentState, String nextState, String tgtSessionId,
String stateModelDefName) {
Message message =
createBasicMessage(messageType, srcInstanceName, srcSessionId, instanceName, tgtSessionId,
currentState, nextState);
message.setPartitionName(partitionName);
message.setStateModelDef(stateModelDefName);
message.setResourceName(resource.getResourceName());
message.setStateModelFactoryName(resource.getStateModelFactoryname());
message.setBucketSize(resource.getBucketSize());
return message;
}
}