blob: 6222df521d1f13d8b99abd99e81996649a2b2ac5 [file] [log] [blame]
package org.apache.helix.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
* Messages sent internally among nodes in the system to respond to changes in state.
*/
public class Message extends HelixProperty {
/**
* The major categories of messages that are sent
*/
public enum MessageType {
STATE_TRANSITION,
STATE_TRANSITION_CANCELLATION,
SCHEDULER_MSG,
USER_DEFINE_MSG,
CONTROLLER_MSG,
TASK_REPLY,
NO_OP,
PARTICIPANT_ERROR_REPORT,
PARTICIPANT_SESSION_CHANGE,
PARTICIPANT_STATUS_CHANGE,
CHAINED_MESSAGE, // this is a message subtype
RELAYED_MESSAGE
}
/**
* Properties attached to Messages
*/
public enum Attributes {
MSG_ID,
RELAY_MSG_ID,
SRC_SESSION_ID,
TGT_SESSION_ID,
SRC_NAME,
TGT_NAME,
SRC_INSTANCE_TYPE,
MSG_STATE,
PARTITION_NAME,
RESOURCE_NAME,
RESOURCE_GROUP_NAME,
RESOURCE_TAG,
FROM_STATE,
TO_STATE,
STATE_MODEL_DEF,
CREATE_TIMESTAMP,
COMPLETION_DUE_TIMESTAMP,
READ_TIMESTAMP,
EXECUTE_START_TIMESTAMP,
MSG_TYPE,
MSG_SUBTYPE,
CORRELATION_ID,
MESSAGE_RESULT,
EXE_SESSION_ID,
TIMEOUT,
RETRY_COUNT,
STATE_MODEL_FACTORY_NAME,
BUCKET_SIZE,
PARENT_MSG_ID, // used for group message mode
ClusterEventName,
INNER_MESSAGE,
RELAY_PARTICIPANTS,
RELAY_TIME,
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER
}
/**
* The current processed state of the message
*/
public enum MessageState {
NEW,
READ, // not used
UNPROCESSABLE // get exception when create handler
}
// default expiry time period for a relay message.
public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000; // 5 second
// This field is not persisted in zk/znode, i.e, the value will only be changed in local cached
// copy of the message.
// Currently, the field is only used for invalidating messages in controller's message cache.
private boolean _expired = false;
// The expect period of time (in ms) that a message should be completed, default 1 day
public static final long MESSAGE_EXPECT_COMPLETION_PERIOD = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.MESSAGE_EXPECTED_COMPLETION_PERIOD,
TimeUnit.DAYS.toMillis(1));
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
/**
* Instantiate a message
* @param type the message category
* @param msgId unique message identifier
*/
public Message(MessageType type, String msgId) {
this(type.name(), msgId);
}
/**
* Instantiate a message
* @param type {@link MessageType} as a string or a custom message type
* @param msgId unique message identifier
*/
public Message(String type, String msgId) {
super(new SessionAwareZNRecord(msgId), msgId);
_record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
setMsgId(msgId);
setMsgState(MessageState.NEW);
_record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
}
/**
* Instantiate a message
* @param record a ZNRecord corresponding to a message
*/
public Message(ZNRecord record) {
super(new SessionAwareZNRecord(record, record.getId()));
if (getMsgState() == null) {
setMsgState(MessageState.NEW);
}
if (getCreateTimeStamp() == 0) {
_record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
}
}
/**
* Instantiate a message with a new id
* @param record a ZNRecord corresponding to a message
* @param id unique message identifier
*/
public Message(ZNRecord record, String id) {
super(new SessionAwareZNRecord(record, id));
setMsgId(id);
}
/**
* @deprecated Not being used.
*
* Instantiate a message with a new id
* @param message message to be copied
* @param id unique message identifier
*/
@Deprecated
public Message(Message message, String id) {
super(new SessionAwareZNRecord(message.getRecord(), id));
setMsgId(id);
}
/**
* Set the time that the message was created
* @param timestamp a UNIX timestamp
*/
public void setCreateTimeStamp(long timestamp) {
_record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
}
/**
* Set the time that the message was expected to be completed
* @param timestamp a UNIX timestamp (in ms)
*/
public void setCompletionDueTimeStamp(long timestamp) {
_record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), timestamp);
}
/**
* Set a subtype of the message
* @param subType name of the subtype
*/
public void setMsgSubType(String subType) {
_record.setSimpleField(Attributes.MSG_SUBTYPE.toString(), subType);
}
/**
* Get the subtype of the message
* @return the subtype name, or null
*/
public String getMsgSubType() {
return _record.getSimpleField(Attributes.MSG_SUBTYPE.toString());
}
/**
* Set the type of this message
* @param type {@link MessageType}
*/
void setMsgType(MessageType type) {
_record.setSimpleField(Attributes.MSG_TYPE.toString(), type.toString());
}
/**
* Get the type of this message
* @return String {@link MessageType} or a custom message type
*/
public String getMsgType() {
return _record.getSimpleField(Attributes.MSG_TYPE.toString());
}
/**
* Get the session identifier of the destination node
* @return session identifier
*/
public String getTgtSessionId() {
return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString());
}
/**
* Set the session identifier of the destination node
* @param tgtSessionId session identifier
*/
public void setTgtSessionId(String tgtSessionId) {
_record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId);
}
/**
* Get the session identifier of the source node
* @return session identifier
*/
public String getSrcSessionId() {
return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString());
}
/**
* Set the session identifier of the source node
* @param srcSessionId session identifier
*/
public void setSrcSessionId(String srcSessionId) {
_record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
}
public void setExpectedSessionId(String expectedSessionId) {
((SessionAwareZNRecord) _record).setExpectedSessionId(expectedSessionId);
}
/**
* Get the session identifier of the node that executes the message
* @return session identifier
*/
public String getExecutionSessionId() {
return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString());
}
/**
* Set the session identifier of the node that executes the message
* @param exeSessionId session identifier
*/
public void setExecuteSessionId(String exeSessionId) {
_record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId);
}
/**
* Get the instance from which the message originated
* @return the instance name
*/
public String getMsgSrc() {
return _record.getSimpleField(Attributes.SRC_NAME.toString());
}
/**
* Set the type of instance that the source node is
* @param type {@link InstanceType}
*/
public void setSrcInstanceType(InstanceType type) {
_record.setEnumField(Attributes.SRC_INSTANCE_TYPE.toString(), type);
}
/**
* Get the type of instance that the source is
* @return {@link InstanceType}
*/
public InstanceType getSrcInstanceType() {
return _record.getEnumField(Attributes.SRC_INSTANCE_TYPE.toString(), InstanceType.class,
InstanceType.PARTICIPANT);
}
/**
* Set the name of the source instance
* @param msgSrc instance name
*/
public void setSrcName(String msgSrc) {
_record.setSimpleField(Attributes.SRC_NAME.toString(), msgSrc);
}
/**
* Get the name of the target instance
* @return instance name
*/
public String getTgtName() {
return _record.getSimpleField(Attributes.TGT_NAME.toString());
}
/**
* Set the current state of the message
* @param msgState {@link MessageState}
*/
public void setMsgState(MessageState msgState) { // HACK: The "tolowerCase()" call is to make the
// change backward compatible
_record.setSimpleField(Attributes.MSG_STATE.toString(), msgState.toString().toLowerCase());
}
/**
* Get the current state of the message
* @return {@link MessageState}
*/
public MessageState getMsgState() {
// HACK: The "toUpperCase()" call is to make the change backward compatible
if (_record.getSimpleField(Attributes.MSG_STATE.toString()) != null) {
return MessageState
.valueOf(_record.getSimpleField(Attributes.MSG_STATE.toString()).toUpperCase());
}
return null;
}
/**
* Set the name of the partition this message concerns
* @param partitionName
*/
public void setPartitionName(String partitionName) {
_record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName);
}
/**
* Get the unique identifier of this message
* @return message identifier
*/
public String getMsgId() {
return _record.getSimpleField(Attributes.MSG_ID.toString());
}
/**
* Set the unique identifier of this message
* @param msgId message identifier
*/
public void setMsgId(String msgId) {
_record.setSimpleField(Attributes.MSG_ID.toString(), msgId);
}
/**
* Set the "from state" for transition-related messages
* @param state the state name
*/
public void setFromState(String state) {
_record.setSimpleField(Attributes.FROM_STATE.toString(), state);
}
/**
* Get the "from-state" for transition-related messages
* @return state name, or null for other message types
*/
public String getFromState() {
return _record.getSimpleField(Attributes.FROM_STATE.toString());
}
/**
* Set the "to state" for transition-related messages
* @param state the state name
*/
public void setToState(String state) {
_record.setSimpleField(Attributes.TO_STATE.toString(), state);
}
/**
* Get the "to state" for transition-related messages
* @return state name, or null for other message types
*/
public String getToState() {
return _record.getSimpleField(Attributes.TO_STATE.toString());
}
/**
* Set the instance for which this message is targeted
* @param msgTgt instance name
*/
public void setTgtName(String msgTgt) {
_record.setSimpleField(Attributes.TGT_NAME.toString(), msgTgt);
}
/**
* Check for debug mode
* @return true if enabled, false if disabled
*/
public Boolean getDebug() {
return false;
}
/**
* Get the generation that this message corresponds to
* @return generation number
*/
public Integer getGeneration() {
return 1;
}
/**
* Set the resource associated with this message
* @param resourceName resource name to set
*/
public void setResourceName(String resourceName) {
_record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
}
/**
* Get the resource associated with this message
* @return resource name
*/
public String getResourceName() {
return _record.getSimpleField(Attributes.RESOURCE_NAME.toString());
}
/**
* Set the resource group associated with this message
* @param resourceGroupName resource group name to set
*/
public void setResourceGroupName(String resourceGroupName) {
_record.setSimpleField(Attributes.RESOURCE_GROUP_NAME.toString(), resourceGroupName);
}
/**
* Get the resource group name associated with this message
* @return resource group name
*/
public String getResourceGroupName() {
return _record.getSimpleField(Attributes.RESOURCE_GROUP_NAME.toString());
}
/**
* Set the resource tag associated with this message
* @param resourceTag resource tag to set
*/
public void setResourceTag(String resourceTag) {
_record.setSimpleField(Attributes.RESOURCE_TAG.toString(), resourceTag);
}
/**
* Get the resource tag associated with this message
* @return resource tag
*/
public String getResourceTag() {
return _record.getSimpleField(Attributes.RESOURCE_TAG.toString());
}
/**
* Get the resource partition associated with this message
* @return partition name
*/
public String getPartitionName() {
return _record.getSimpleField(Attributes.PARTITION_NAME.toString());
}
/**
* Get the state model definition name
* @return a String reference to the state model definition, e.g. "MasterSlave"
*/
public String getStateModelDef() {
return _record.getSimpleField(Attributes.STATE_MODEL_DEF.toString());
}
/**
* Set the state model definition name
* @param stateModelDefName a reference to the state model definition, e.g. "MasterSlave"
*/
public void setStateModelDef(String stateModelDefName) {
_record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefName);
}
/**
* Set the time that this message was read
* @param time UNIX timestamp (in ms)
*/
public void setReadTimeStamp(long time) {
_record.setLongField(Attributes.READ_TIMESTAMP.toString(), time);
}
/**
* Set the time that the instance executes tasks as instructed by this message
* @param time UNIX timestamp (in ms)
*/
public void setExecuteStartTimeStamp(long time) {
_record.setLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), time);
}
/**
* Get the time that this message was read
* @return UNIX timestamp (in ms)
*/
public long getReadTimeStamp() {
return _record.getLongField(Attributes.READ_TIMESTAMP.toString(), 0L);
}
/**
* Get the time that execution occurred as a result of this message
* @return UNIX timestamp (in ms)
*/
public long getExecuteStartTimeStamp() {
return _record.getLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), 0L);
}
/**
* Get the time that this message was created
* @return UNIX timestamp (in ms)
*/
public long getCreateTimeStamp() {
return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
}
/**
* Get the time that the message was expected to be completed
* @return UNIX timestamp (in ms)
*/
public long getCompletionDueTimeStamp() {
long completionDue = _record.getLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), 0L);
if (completionDue == 0) {
completionDue = getCreateTimeStamp() + MESSAGE_EXPECT_COMPLETION_PERIOD;
}
return completionDue;
}
/**
* Set a unique identifier that others can use to refer to this message in replies
* @param correlationId a unique identifier, usually randomly generated
*/
public void setCorrelationId(String correlationId) {
_record.setSimpleField(Attributes.CORRELATION_ID.toString(), correlationId);
}
/**
* Get the unique identifier attached to this message for reply matching
* @return the correlation identifier
*/
public String getCorrelationId() {
return _record.getSimpleField(Attributes.CORRELATION_ID.toString());
}
/**
* Get the time to wait before stopping execution of this message
* @return the timeout in ms, or -1 indicating no timeout
*/
public int getExecutionTimeout() {
return _record.getIntField(Attributes.TIMEOUT.toString(), -1);
}
/**
* Set the time to wait before stopping execution of this message
* @param timeout the timeout in ms, or -1 indicating no timeout
*/
public void setExecutionTimeout(int timeout) {
_record.setIntField(Attributes.TIMEOUT.toString(), timeout);
}
/**
* Set the number of times to retry message handling on timeouts and other unexpected conditions
* with Exceptions. For example, the participant failed to create message handler.
* @param retryCount maximum number of retries
*/
public void setRetryCount(int retryCount) {
_record.setIntField(Attributes.RETRY_COUNT.toString(), retryCount);
}
/**
* Set the number of times to retry message handling on timeouts and other unexpected conditions
* with Exceptions. For example, the participant failed to create message handler.
* @return maximum number of retries
*/
public int getRetryCount() {
return _record.getIntField(Attributes.RETRY_COUNT.toString(), 0);
}
/**
* Get the results of message execution
* @return map of result property and value pairs
*/
public Map<String, String> getResultMap() {
return _record.getMapField(Attributes.MESSAGE_RESULT.toString());
}
/**
* Set the results of message execution
* @param resultMap map of result property and value pairs
*/
public void setResultMap(Map<String, String> resultMap) {
_record.setMapField(Attributes.MESSAGE_RESULT.toString(), resultMap);
}
/**
* Get the state model factory associated with this message
* @return the name of the factory
*/
public String getStateModelFactoryName() {
return _record.getSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString());
}
/**
* Set the state model factory associated with this message
* @param factoryName the name of the factory
*/
public void setStateModelFactoryName(String factoryName) {
_record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
}
// TODO: remove this. impl in HelixProperty
@Override
public int getBucketSize() {
return _record.getIntField(Attributes.BUCKET_SIZE.toString(), 0);
}
@Override
public void setBucketSize(int bucketSize) {
if (bucketSize > 0) {
_record.setIntField(Attributes.BUCKET_SIZE.toString(), bucketSize);
}
}
/**
* Add or change a message attribute
* @param attr {@link Attributes} attribute name
* @param val attribute value
*/
public void setAttribute(Attributes attr, String val) {
_record.setSimpleField(attr.toString(), val);
}
/**
* Get the value of an attribute
* @param attr {@link Attributes}
* @return attribute value
*/
public String getAttribute(Attributes attr) {
return _record.getSimpleField(attr.toString());
}
/**
* Create a reply based on an incoming message
* @param srcMessage the incoming message
* @param instanceName the instance that is the source of the reply
* @param taskResultMap the result of executing the incoming message
* @return the reply Message
*/
public static Message createReplyMessage(Message srcMessage, String instanceName,
Map<String, String> taskResultMap) {
if (srcMessage.getCorrelationId() == null) {
throw new HelixException(
"Message " + srcMessage.getMsgId() + " does not contain correlation id");
}
Message replyMessage = new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
replyMessage.setCorrelationId(srcMessage.getCorrelationId());
replyMessage.setResultMap(taskResultMap);
replyMessage.setTgtSessionId("*");
replyMessage.setMsgState(MessageState.NEW);
replyMessage.setSrcName(instanceName);
if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER) {
replyMessage.setTgtName(InstanceType.CONTROLLER.name());
} else {
replyMessage.setTgtName(srcMessage.getMsgSrc());
}
return replyMessage;
}
/**
* Add a partition to a collection of partitions associated with this message
* @param partitionName the partition name to add
*/
public void addPartitionName(String partitionName) {
if (_record.getListField(Attributes.PARTITION_NAME.toString()) == null) {
_record.setListField(Attributes.PARTITION_NAME.toString(), new ArrayList<String>());
}
List<String> partitionNames = _record.getListField(Attributes.PARTITION_NAME.toString());
if (!partitionNames.contains(partitionName)) {
partitionNames.add(partitionName);
}
}
/**
* Get a list of partitions associated with this message
* @return list of partition names
*/
public List<String> getPartitionNames() {
List<String> partitionNames = _record.getListField(Attributes.PARTITION_NAME.toString());
if (partitionNames == null) {
return Collections.emptyList();
}
return partitionNames;
}
/**
* Get the completion time of previous task associated with this message.
* This applies only when this is a relay message,
* which specified the completion time of the task running on the participant that sent this relay
* message.
* @return
*/
public long getRelayTime() {
return _record.getLongField(Attributes.RELAY_TIME.name(), -1);
}
/**
* Set the completion time of previous task associated with this message.
* This applies only when this is a relay message,
* which specified the completion time of the task running on the participant that sent this relay
* message.
* @param completionTime
*/
public void setRelayTime(long completionTime) {
_record.setLongField(Attributes.RELAY_TIME.name(), completionTime);
}
/**
* Attach a relayed message and its destination participant to this message.
* WARNNING: only content in SimpleFields of relayed message will be carried over and sent,
* all contents in either ListFields or MapFields will be ignored.
* @param instance destination participant name
* @param message relayed message.
*/
public void attachRelayMessage(String instance, Message message) {
List<String> relayList = _record.getListField(Attributes.RELAY_PARTICIPANTS.name());
if (relayList == null) {
relayList = Collections.EMPTY_LIST;
}
Set<String> relayParticipants = new LinkedHashSet<>(relayList);
relayParticipants.add(instance);
Map<String, String> messageInfo = message.getRecord().getSimpleFields();
messageInfo.put(Attributes.RELAY_MSG_ID.name(), message.getId());
messageInfo.put(Attributes.MSG_SUBTYPE.name(), MessageType.RELAYED_MESSAGE.name());
messageInfo.put(Attributes.RELAY_FROM.name(), getTgtName());
messageInfo.put(Attributes.EXPIRY_PERIOD.name(), String.valueOf(RELAY_MESSAGE_DEFAULT_EXPIRY));
_record.setMapField(instance, messageInfo);
_record.setListField(Attributes.RELAY_PARTICIPANTS.name(),
Lists.newArrayList(relayParticipants));
}
/**
* Get relay message attached for the given instance.
* @param instance
* @return null if no message for the instance
*/
public Message getRelayMessage(String instance) {
Map<String, String> messageInfo = _record.getMapField(instance);
if (messageInfo != null) {
String id = messageInfo.get(Attributes.RELAY_MSG_ID.name());
if (id == null) {
id = messageInfo.get(Attributes.MSG_ID.name());
if (id == null) {
return null;
}
}
ZNRecord record = new ZNRecord(id);
record.setSimpleFields(messageInfo);
return new Message(record);
}
return null;
}
public String getRelaySrcHost() {
return _record.getSimpleField(Attributes.RELAY_FROM.name());
}
/**
* Get all relay messages attached to this message as a map (instance->message).
* @return map of instanceName->message, empty map if none.
*/
public Map<String, Message> getRelayMessages() {
Map<String, Message> relayMessageMap = new HashMap<>();
List<String> relayParticipants = _record.getListField(Attributes.RELAY_PARTICIPANTS.name());
if (relayParticipants != null) {
for (String p : relayParticipants) {
Message msg = getRelayMessage(p);
if (p != null) {
relayMessageMap.put(p, msg);
}
}
}
return relayMessageMap;
}
/**
* Whether there are any relay message attached to this message.
* @return
*/
public boolean hasRelayMessages() {
List<String> relayHosts = _record.getListField(Attributes.RELAY_PARTICIPANTS.name());
return (relayHosts != null && relayHosts.size() > 0);
}
/**
* Whether this message is a relay message.
* @return
*/
public boolean isRelayMessage() {
String subType = _record.getStringField(Attributes.MSG_SUBTYPE.name(), null);
String relayFrom = _record.getStringField(Attributes.RELAY_FROM.name(), null);
return MessageType.RELAYED_MESSAGE.name().equals(subType) && (relayFrom != null);
}
/**
* Whether a message is expired.
* A message is expired if:
* 1) creationTime + expiryPeriod > current time
* or
* 2) relayTime + expiryPeriod > current time iff it is relay message.
* @return
*/
public boolean isExpired() {
if (_expired) {
return true;
}
long expiry = getExpiryPeriod();
if (expiry < 0) {
return false;
}
long current = System.currentTimeMillis();
// use relay time if this is a relay message
if (isRelayMessage()) {
long relayTime = getRelayTime();
return (relayTime > 0 && (relayTime + expiry < current));
}
return getCreateTimeStamp() + expiry < current;
}
/**
* Set a message to expired.
* !! CAUTION: The expired field is not persisted into ZNODE,
* i.e, set this field will only change its value in its local cache version,
* not the one on ZK, even ZkClient.Set(Message) is called to persist it into ZK.
* This method should NOT be called by any non-Helix code.
* @param expired
*/
public void setExpired(boolean expired) {
_expired = expired;
}
/**
* Get the expiry period (in milliseconds)
* @return
*/
public long getExpiryPeriod() {
return _record.getLongField(Attributes.EXPIRY_PERIOD.name(), -1);
}
/**
* Set expiry period for this message.
* A message will be expired after this period of time from either its 1) creationTime or 2)
* relayTime if it is relay message.
* Default is -1 if it is not set.
* @param expiry
*/
public void setExpiryPeriod(long expiry) {
_record.setLongField(Attributes.EXPIRY_PERIOD.name(), expiry);
}
/**
* Get the source cluster name
* @return the source cluster from where the message was sent or null if the message was sent
* locally
*/
public String getSrcClusterName() {
return _record.getStringField(Attributes.SRC_CLUSTER.name(), null);
}
/**
* Set the source cluster name
* @param clusterName source cluster name where message was sent from
*/
public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}
/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
*/
public boolean isControlerMsg() {
return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
}
public boolean isParticipantStatusChangeType() {
return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType());
}
/**
* Get the {@link PropertyKey} for this message
* @param keyBuilder PropertyKey Builder
* @param instanceName target instance
* @return message PropertyKey
*/
public PropertyKey getKey(Builder keyBuilder, String instanceName) {
if (isControlerMsg()) {
return keyBuilder.controllerMessage(getId());
} else {
return keyBuilder.message(instanceName, getId());
}
}
private boolean isNullOrEmpty(String data) {
return data == null || data.length() == 0 || data.trim().length() == 0;
}
@Override
public boolean isValid() {
// TODO: refactor message to state transition message and task-message and
// implement this function separately
if (getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
boolean isNotValid = isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName())
|| isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef())
|| isNullOrEmpty(getToState()) || isNullOrEmpty(getFromState())
|| isNullOrEmpty(getTgtSessionId());
return !isNotValid;
}
return true;
}
}