blob: 1d70f991fa26e9357c903fc08c6b1d3d455eec13 [file] [log] [blame]
package org.apache.helix.util;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.Error;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Util class to create statusUpdates ZK records and error ZK records. These message
* records are for diagnostics only, and they are stored on the "StatusUpdates" and
* "errors" ZNodes in the zookeeper instances.
public class StatusUpdateUtil {
static Logger _logger = LoggerFactory.getLogger(StatusUpdateUtil.class);
public static final boolean ERROR_LOG_TO_ZK_ENABLED =
public static class Transition implements Comparable<Transition> {
private final String _msgID;
private final long _timeStamp;
private final String _from;
private final String _to;
public Transition(String msgID, long timeStamp, String from, String to) {
this._msgID = msgID;
this._timeStamp = timeStamp;
this._from = from;
this._to = to;
public int compareTo(Transition t) {
if (_timeStamp < t._timeStamp)
return -1;
else if (_timeStamp > t._timeStamp)
return 1;
return 0;
public boolean equals(Transition t) {
return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
public String getFromState() {
return _from;
public String getToState() {
return _to;
public String getMsgID() {
return _msgID;
public String toString() {
return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
public static enum TaskStatus {
public static class StatusUpdateContents {
private final List<Transition> _transitions;
private final Map<String, TaskStatus> _taskMessages;
private StatusUpdateContents(List<Transition> transitions, Map<String, TaskStatus> taskMessages) {
this._transitions = transitions;
this._taskMessages = taskMessages;
public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
String instance, String resourceGroup, String partition) {
return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
// TODO: We should build a map and return the key instead of searching
// everytime
// for an (instance, resourceGroup, session, partition) tuple.
// But such a map is very similar to what exists in ZNRecord
// passing null for sessionID results in searching across all sessions
public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
String instance, String resourceGroup, String sessionID, String partition) {
Builder keyBuilder = accessor.keyBuilder();
List<ZNRecord> instances =
HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs(), true));
List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
for (ZNRecord znRecord : instances) {
String instanceName = znRecord.getId();
if (!instanceName.equals(instance)) {
List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
for (String session : sessions) {
if (sessionID != null && !session.equals(sessionID)) {
List<String> resourceGroups =
accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName, session));
for (String resourceGroupName : resourceGroups) {
if (!resourceGroupName.equals(resourceGroup)) {
List<String> partitionStrings =
accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName, session,
for (String partitionString : partitionStrings) {
ZNRecord partitionRecord =
keyBuilder.stateTransitionStatus(instanceName, session, resourceGroupName,
if (!partitionString.equals(partition)) {
return new StatusUpdateContents(getSortedTransitions(partitionRecords),
public List<Transition> getTransitions() {
return _transitions;
public Map<String, TaskStatus> getTaskMessages() {
return _taskMessages;
// input: List<ZNRecord> corresponding to (instance, database,
// partition) tuples across all sessions
// return list of transitions sorted from earliest to latest
private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords) {
List<Transition> transitions = new ArrayList<Transition>();
for (ZNRecord partition : partitionRecords) {
Map<String, Map<String, String>> mapFields = partition.getMapFields();
for (String key : mapFields.keySet()) {
if (key.startsWith("MESSAGE")) {
Map<String, String> m = mapFields.get(key);
long createTimeStamp = 0;
try {
createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
} catch (Exception e) {
transitions.add(new Transition(m.get("MSG_ID"), createTimeStamp, m.get("FROM_STATE"), m
return transitions;
private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords) {
Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
for (ZNRecord partition : partitionRecords) {
Map<String, Map<String, String>> mapFields = partition.getMapFields();
// iterate over the task status updates in the order they occurred
// so that the last status can be recorded
for (String key : mapFields.keySet()) {
if (key.contains("STATE_TRANSITION")) {
Map<String, String> m = mapFields.get(key);
String id = m.get("MSG_ID");
String statusString = m.get("AdditionalInfo");
TaskStatus status = TaskStatus.UNKNOWN;
if (statusString.contains("scheduled"))
status = TaskStatus.SCHEDULED;
else if (statusString.contains("invoking"))
status = TaskStatus.INVOKING;
else if (statusString.contains("completed"))
status = TaskStatus.COMPLETED;
taskMessages.put(id, status);
return taskMessages;
public enum Level {
* Creates an empty ZNRecord as the statusUpdate/error record
* @param id
public ZNRecord createEmptyStatusUpdateRecord(String id) {
return new ZNRecord(id);
* Create a ZNRecord for a message, which stores the content of the message (stored in
* simple fields) into the ZNRecord mapFields. In this way, the message update can be
* merged with the previous status update record in the zookeeper. See ZNRecord.merge()
* for more details.
ZNRecord createMessageLogRecord(Message message) {
ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
String mapFieldKey = "MESSAGE " + message.getMsgId();
result.setMapField(mapFieldKey, new TreeMap<String, String>());
// Store all the simple fields of the message in the new ZNRecord's map
// field.
for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet()) {
if (message.getResultMap() != null) {
result.setMapField("MessageResult", message.getResultMap());
return result;
Map<String, String> _recordedMessages = new ConcurrentHashMap<>();
* Create a statusupdate that is related to a cluster manager message.
* @param message
* the related cluster manager message
* @param level
* the error level
* @param classInfo
* class info about the class that reports the status update
* @param additionalInfo
* info the additional debug information
public ZNRecord createMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo) {
ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
Map<String, String> contentMap = new TreeMap<String, String>();
contentMap.put("Message state",
(message.getMsgState() == null ? "NULL" : message.getMsgState().toString()));
contentMap.put("AdditionalInfo", additionalInfo);
contentMap.put("Class", classInfo.toString());
contentMap.put("MSG_ID", message.getMsgId());
result.setMapField(generateMapFieldId(level, getRecordIdForMessage(message)), contentMap);
return result;
private String getRecordIdForMessage(Message message) {
if (message.getMsgType().equals(MessageType.STATE_TRANSITION)) {
return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0) + "->"
+ message.getToState().charAt(0) + " " + UUID.randomUUID().toString();
} else {
return message.getMsgType() + " " + UUID.randomUUID().toString();
private String generateMapFieldId(Level level, String recordId) {
DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
String time = formatter.format(new Date());
return String.format("%4s %26s ", level.toString(), time) + recordId;
public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo, HelixDataAccessor accessor) {
try {
ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
publishStatusUpdateRecord(record, message, level, accessor,
} catch (Exception e) {
_logger.error("Exception while logging status update", e);
* Create a statusupdate that is related to a cluster manager message, then record it to
* the zookeeper store.
* @param message
* the related cluster manager message
* @param level
* the error level
* @param classInfo
* class info about the class that reports the status update
* @param additionalInfo
* info the additional debug information
* @param manager
* the HelixManager that writes the status update to zookeeper
public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo, HelixManager manager) {
try {
ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
publishStatusUpdateRecord(record, message, level, manager.getHelixDataAccessor(),
manager.getInstanceType().equals(InstanceType.CONTROLLER) || manager.getInstanceType()
} catch (Exception e) {
_logger.error("Exception while logging status update", e);
public enum ErrorType {
public void logError(ErrorType errorType, Class classInfo, String additionalInfo, HelixManager helixManager) {
if (helixManager != null) {
logError(errorType, "ErrorInfo", helixManager.getInstanceName(), helixManager.getSessionId(), additionalInfo,
classInfo, helixManager.getHelixDataAccessor(),
helixManager.getInstanceType().equals(InstanceType.CONTROLLER) || helixManager.getInstanceType()
} else {
_logger.error("Exception while logging error. HelixManager is null.");
private void logError(ErrorType errorType, String updateKey, String instanceName,
String sessionId, String additionalInfo, Class classInfo, HelixDataAccessor accessor,
boolean isController) {
try {
ZNRecord record = createEmptyStatusUpdateRecord(sessionId + "__" + instanceName);
Map<String, String> contentMap = new TreeMap<>();
contentMap.put("AdditionalInfo", additionalInfo);
contentMap.put("Class", classInfo.toString());
contentMap.put("SessionId", sessionId);
record.setMapField(generateMapFieldId(Level.HELIX_ERROR, updateKey), contentMap);
publishErrorRecord(record, instanceName,, updateKey, sessionId, accessor,
} catch (Exception e) {
_logger.error("Exception while logging error", e);
public void logError(Message message, Class classInfo, String additionalInfo, HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, manager);
public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
HelixManager manager) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
additionalInfo + sw.toString(), manager);
public void logInfo(Message message, Class classInfo, String additionalInfo,
HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, manager);
public void logWarning(Message message, Class classInfo, String additionalInfo,
HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, manager);
public void logError(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, accessor);
public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
HelixDataAccessor accessor) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
additionalInfo + sw.toString(), accessor);
public void logInfo(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, accessor);
public void logWarning(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, accessor);
private String getStatusUpdateKey(Message message) {
if (message.getMsgType().equalsIgnoreCase( {
return message.getPartitionName();
return message.getMsgId();
* Generate the sub-path under STATUSUPDATE or ERROR path for a status update
String getStatusUpdateSubPath(Message message) {
if (message.getMsgType().equalsIgnoreCase( {
return message.getResourceName();
return message.getMsgType();
String getStatusUpdateRecordName(Message message) {
if (message.getMsgType().equalsIgnoreCase( {
return message.getTgtSessionId() + "__" + message.getResourceName();
return message.getMsgId();
* Write a status update record to zookeeper to the zookeeper store.
* @param record
* the status update record
* @param message
* the message to be logged
* @param level
* the error level of the message update
* @param accessor
* the zookeeper data accessor that writes the status update to zookeeper
* @param isController
* if the update is for a controller instance or not
void publishStatusUpdateRecord(ZNRecord record, Message message, Level level,
HelixDataAccessor accessor, boolean isController) {
String instanceName = message.getTgtName();
String statusUpdateSubPath = getStatusUpdateSubPath(message);
String statusUpdateKey = getStatusUpdateKey(message);
String sessionId = message.getExecutionSessionId();
if (sessionId == null) {
sessionId = message.getTgtSessionId();
if (sessionId == null) {
sessionId = "*";
Builder keyBuilder = accessor.keyBuilder();
if (!_recordedMessages.containsKey(message.getMsgId())) {
ZNRecord statusUpdateRecord = createMessageLogRecord(message);
PropertyKey propertyKey;
if (isController) {
propertyKey = keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey);
} else {
propertyKey =
keyBuilder.stateTransitionStatus(instanceName, sessionId, statusUpdateSubPath,
accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
if (_logger.isTraceEnabled()) {
_logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
+ statusUpdateRecord);
_recordedMessages.put(message.getMsgId(), message.getMsgId());
PropertyKey propertyKey;
if (isController) {
propertyKey = keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey);
} else {
propertyKey =
keyBuilder.stateTransitionStatus(instanceName, sessionId, statusUpdateSubPath,
accessor.updateProperty(propertyKey, new StatusUpdate(record));
if (_logger.isTraceEnabled()) {
_logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
// If the error level is ERROR, also write the record to "ERROR" ZNode
if (Level.HELIX_ERROR == level) {
publishErrorRecord(record, instanceName, statusUpdateSubPath, statusUpdateKey, sessionId,
accessor, isController);
* Write an error record to zookeeper to the zookeeper store.
* @param record
* the status update record
* @param instanceName
* the instance name
* @param updateSubPath
* the error update sub path
* @param updateKey
* the error update key
* @param sessionId
* the session id
* @param accessor
* the zookeeper data accessor that writes the status update to zookeeper
* @param isController
* if the error log is for a controller instance or not
void publishErrorRecord(ZNRecord record, String instanceName, String updateSubPath,
String updateKey, String sessionId, HelixDataAccessor accessor, boolean isController) {
_logger.error("StatusUpdate Error record: {}", record);
Builder keyBuilder = accessor.keyBuilder();
if (isController) {
// TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
// ../{sessionId}/{subPath}
accessor.setProperty(keyBuilder.controllerTaskError(updateSubPath), new Error(record));
} else {
accessor.updateProperty(keyBuilder.stateTransitionError(instanceName, sessionId,
updateSubPath, updateKey), new Error(record));