blob: b7a422a3c4d50a611d09b0449a944d5c72eb3662 [file] [log] [blame]
package org.apache.helix.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.Error;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Util class to create statusUpdates ZK records and error ZK records. These message
* records are for diagnostics only, and they are stored on the "StatusUpdates" and
* "errors" ZNodes in the zookeeper instances.
*/
public class StatusUpdateUtil {
static Logger _logger = LoggerFactory.getLogger(StatusUpdateUtil.class);
public static class Transition implements Comparable<Transition> {
private final String _msgID;
private final long _timeStamp;
private final String _from;
private final String _to;
public Transition(String msgID, long timeStamp, String from, String to) {
this._msgID = msgID;
this._timeStamp = timeStamp;
this._from = from;
this._to = to;
}
@Override
public int compareTo(Transition t) {
if (_timeStamp < t._timeStamp)
return -1;
else if (_timeStamp > t._timeStamp)
return 1;
else
return 0;
}
public boolean equals(Transition t) {
return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
}
public String getFromState() {
return _from;
}
public String getToState() {
return _to;
}
public String getMsgID() {
return _msgID;
}
@Override
public String toString() {
return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
}
}
public static enum TaskStatus {
UNKNOWN,
NEW,
SCHEDULED,
INVOKING,
COMPLETED,
FAILED
}
public static class StatusUpdateContents {
private final List<Transition> _transitions;
private final Map<String, TaskStatus> _taskMessages;
private StatusUpdateContents(List<Transition> transitions, Map<String, TaskStatus> taskMessages) {
this._transitions = transitions;
this._taskMessages = taskMessages;
}
public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
String instance, String resourceGroup, String partition) {
return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
}
// TODO: We should build a map and return the key instead of searching
// everytime
// for an (instance, resourceGroup, session, partition) tuple.
// But such a map is very similar to what exists in ZNRecord
// passing null for sessionID results in searching across all sessions
public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
String instance, String resourceGroup, String sessionID, String partition) {
Builder keyBuilder = accessor.keyBuilder();
List<ZNRecord> instances =
HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
for (ZNRecord znRecord : instances) {
String instanceName = znRecord.getId();
if (!instanceName.equals(instance)) {
continue;
}
List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
for (String session : sessions) {
if (sessionID != null && !session.equals(sessionID)) {
continue;
}
List<String> resourceGroups =
accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName, session));
for (String resourceGroupName : resourceGroups) {
if (!resourceGroupName.equals(resourceGroup)) {
continue;
}
List<String> partitionStrings =
accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName, session,
resourceGroupName));
for (String partitionString : partitionStrings) {
ZNRecord partitionRecord =
accessor.getProperty(
keyBuilder.stateTransitionStatus(instanceName, session, resourceGroupName,
partitionString)).getRecord();
if (!partitionString.equals(partition)) {
continue;
}
partitionRecords.add(partitionRecord);
}
}
}
}
return new StatusUpdateContents(getSortedTransitions(partitionRecords),
getTaskMessages(partitionRecords));
}
public List<Transition> getTransitions() {
return _transitions;
}
public Map<String, TaskStatus> getTaskMessages() {
return _taskMessages;
}
// input: List<ZNRecord> corresponding to (instance, database,
// partition) tuples across all sessions
// return list of transitions sorted from earliest to latest
private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords) {
List<Transition> transitions = new ArrayList<Transition>();
for (ZNRecord partition : partitionRecords) {
Map<String, Map<String, String>> mapFields = partition.getMapFields();
for (String key : mapFields.keySet()) {
if (key.startsWith("MESSAGE")) {
Map<String, String> m = mapFields.get(key);
long createTimeStamp = 0;
try {
createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
} catch (Exception e) {
}
transitions.add(new Transition(m.get("MSG_ID"), createTimeStamp, m.get("FROM_STATE"), m
.get("TO_STATE")));
}
}
}
Collections.sort(transitions);
return transitions;
}
private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords) {
Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
for (ZNRecord partition : partitionRecords) {
Map<String, Map<String, String>> mapFields = partition.getMapFields();
// iterate over the task status updates in the order they occurred
// so that the last status can be recorded
for (String key : mapFields.keySet()) {
if (key.contains("STATE_TRANSITION")) {
Map<String, String> m = mapFields.get(key);
String id = m.get("MSG_ID");
String statusString = m.get("AdditionalInfo");
TaskStatus status = TaskStatus.UNKNOWN;
if (statusString.contains("scheduled"))
status = TaskStatus.SCHEDULED;
else if (statusString.contains("invoking"))
status = TaskStatus.INVOKING;
else if (statusString.contains("completed"))
status = TaskStatus.COMPLETED;
taskMessages.put(id, status);
}
}
}
return taskMessages;
}
}
public enum Level {
HELIX_ERROR,
HELIX_WARNING,
HELIX_INFO
}
/**
* Creates an empty ZNRecord as the statusUpdate/error record
* @param id
*/
public ZNRecord createEmptyStatusUpdateRecord(String id) {
return new ZNRecord(id);
}
/**
* Create a ZNRecord for a message, which stores the content of the message (stored in
* simple fields) into the ZNRecord mapFields. In this way, the message update can be
* merged with the previous status update record in the zookeeper. See ZNRecord.merge()
* for more details.
*/
ZNRecord createMessageLogRecord(Message message) {
ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
String mapFieldKey = "MESSAGE " + message.getMsgId();
result.setMapField(mapFieldKey, new TreeMap<String, String>());
// Store all the simple fields of the message in the new ZNRecord's map
// field.
for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet()) {
result.getMapField(mapFieldKey).put(simpleFieldKey,
message.getRecord().getSimpleField(simpleFieldKey));
}
if (message.getResultMap() != null) {
result.setMapField("MessageResult", message.getResultMap());
}
return result;
}
Map<String, String> _recordedMessages = new ConcurrentHashMap<>();
/**
* Create a statusupdate that is related to a cluster manager message.
* @param message
* the related cluster manager message
* @param level
* the error level
* @param classInfo
* class info about the class that reports the status update
* @param additionalInfo
* info the additional debug information
*/
public ZNRecord createMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo) {
ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
Map<String, String> contentMap = new TreeMap<String, String>();
contentMap.put("Message state",
(message.getMsgState() == null ? "NULL" : message.getMsgState().toString()));
contentMap.put("AdditionalInfo", additionalInfo);
contentMap.put("Class", classInfo.toString());
contentMap.put("MSG_ID", message.getMsgId());
result.setMapField(generateMapFieldId(level, getRecordIdForMessage(message)), contentMap);
return result;
}
private String getRecordIdForMessage(Message message) {
if (message.getMsgType().equals(MessageType.STATE_TRANSITION)) {
return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0) + "->"
+ message.getToState().charAt(0) + " " + UUID.randomUUID().toString();
} else {
return message.getMsgType() + " " + UUID.randomUUID().toString();
}
}
private String generateMapFieldId(Level level, String recordId) {
DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
String time = formatter.format(new Date());
return String.format("%4s %26s ", level.toString(), time) + recordId;
}
@Deprecated
public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo, HelixDataAccessor accessor) {
try {
ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
publishStatusUpdateRecord(record, message, level, accessor,
message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name()));
} catch (Exception e) {
_logger.error("Exception while logging status update", e);
}
}
/**
* Create a statusupdate that is related to a cluster manager message, then record it to
* the zookeeper store.
* @param message
* the related cluster manager message
* @param level
* the error level
* @param classInfo
* class info about the class that reports the status update
* @param additionalInfo
* info the additional debug information
* @param manager
* the HelixManager that writes the status update to zookeeper
*/
public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
String additionalInfo, HelixManager manager) {
try {
ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
publishStatusUpdateRecord(record, message, level, manager.getHelixDataAccessor(),
manager.getInstanceType().equals(InstanceType.CONTROLLER) || manager.getInstanceType()
.equals(InstanceType.CONTROLLER_PARTICIPANT));
} catch (Exception e) {
_logger.error("Exception while logging status update", e);
}
}
public enum ErrorType {
RebalanceResourceFailure,
}
public void logError(ErrorType errorType, Class classInfo, String additionalInfo, HelixManager helixManager) {
if (helixManager != null) {
logError(errorType, "ErrorInfo", helixManager.getInstanceName(), helixManager.getSessionId(), additionalInfo,
classInfo, helixManager.getHelixDataAccessor(),
helixManager.getInstanceType().equals(InstanceType.CONTROLLER) || helixManager.getInstanceType()
.equals(InstanceType.CONTROLLER_PARTICIPANT));
} else {
_logger.error("Exception while logging error. HelixManager is null.");
}
}
private void logError(ErrorType errorType, String updateKey, String instanceName,
String sessionId, String additionalInfo, Class classInfo, HelixDataAccessor accessor,
boolean isController) {
try {
ZNRecord record = createEmptyStatusUpdateRecord(sessionId + "__" + instanceName);
Map<String, String> contentMap = new TreeMap<>();
contentMap.put("AdditionalInfo", additionalInfo);
contentMap.put("Class", classInfo.toString());
contentMap.put("SessionId", sessionId);
record.setMapField(generateMapFieldId(Level.HELIX_ERROR, updateKey), contentMap);
publishErrorRecord(record, instanceName, errorType.name(), updateKey, sessionId, accessor,
isController);
} catch (Exception e) {
_logger.error("Exception while logging error", e);
}
}
public void logError(Message message, Class classInfo, String additionalInfo, HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, manager);
}
public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
HelixManager manager) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
additionalInfo + sw.toString(), manager);
}
public void logInfo(Message message, Class classInfo, String additionalInfo,
HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, manager);
}
public void logWarning(Message message, Class classInfo, String additionalInfo,
HelixManager manager) {
logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, manager);
}
@Deprecated
public void logError(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, accessor);
}
@Deprecated
public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
HelixDataAccessor accessor) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
additionalInfo + sw.toString(), accessor);
}
@Deprecated
public void logInfo(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, accessor);
}
@Deprecated
public void logWarning(Message message, Class classInfo, String additionalInfo,
HelixDataAccessor accessor) {
logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, accessor);
}
private String getStatusUpdateKey(Message message) {
if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
return message.getPartitionName();
}
return message.getMsgId();
}
/**
* Generate the sub-path under STATUSUPDATE or ERROR path for a status update
*/
String getStatusUpdateSubPath(Message message) {
if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
return message.getResourceName();
}
return message.getMsgType();
}
String getStatusUpdateRecordName(Message message) {
if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
return message.getTgtSessionId() + "__" + message.getResourceName();
}
return message.getMsgId();
}
/**
* Write a status update record to zookeeper to the zookeeper store.
* @param record
* the status update record
* @param message
* the message to be logged
* @param level
* the error level of the message update
* @param accessor
* the zookeeper data accessor that writes the status update to zookeeper
* @param isController
* if the update is for a controller instance or not
*/
void publishStatusUpdateRecord(ZNRecord record, Message message, Level level,
HelixDataAccessor accessor, boolean isController) {
String instanceName = message.getTgtName();
String statusUpdateSubPath = getStatusUpdateSubPath(message);
String statusUpdateKey = getStatusUpdateKey(message);
String sessionId = message.getExecutionSessionId();
if (sessionId == null) {
sessionId = message.getTgtSessionId();
}
if (sessionId == null) {
sessionId = "*";
}
Builder keyBuilder = accessor.keyBuilder();
if (!_recordedMessages.containsKey(message.getMsgId())) {
if (isController) {
accessor
.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey),
new StatusUpdate(createMessageLogRecord(message)));
} else {
PropertyKey propertyKey =
keyBuilder.stateTransitionStatus(instanceName, sessionId, statusUpdateSubPath,
statusUpdateKey);
ZNRecord statusUpdateRecord = createMessageLogRecord(message);
// For now write participant StatusUpdates to log4j.
// we are using restlet as another data channel to report to controller.
if (_logger.isTraceEnabled()) {
_logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
+ statusUpdateRecord);
}
accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
}
_recordedMessages.put(message.getMsgId(), message.getMsgId());
}
if (isController) {
accessor.updateProperty(
keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey), new StatusUpdate(
record));
} else {
PropertyKey propertyKey =
keyBuilder.stateTransitionStatus(instanceName, sessionId, statusUpdateSubPath,
statusUpdateKey);
// For now write participant StatusUpdates to log4j.
// we are using restlet as another data channel to report to controller.
if (_logger.isTraceEnabled()) {
_logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
}
accessor.updateProperty(propertyKey, new StatusUpdate(record));
}
// If the error level is ERROR, also write the record to "ERROR" ZNode
if (Level.HELIX_ERROR == level) {
publishErrorRecord(record, instanceName, statusUpdateSubPath, statusUpdateKey, sessionId,
accessor, isController);
}
}
/**
* Write an error record to zookeeper to the zookeeper store.
* @param record
* the status update record
* @param instanceName
* the instance name
* @param updateSubPath
* the error update sub path
* @param updateKey
* the error update key
* @param sessionId
* the session id
* @param accessor
* the zookeeper data accessor that writes the status update to zookeeper
* @param isController
* if the error log is for a controller instance or not
*/
void publishErrorRecord(ZNRecord record, String instanceName, String updateSubPath,
String updateKey, String sessionId, HelixDataAccessor accessor, boolean isController) {
Builder keyBuilder = accessor.keyBuilder();
if (isController) {
// TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
// ../{sessionId}/{subPath}
accessor.setProperty(keyBuilder.controllerTaskError(updateSubPath), new Error(record));
} else {
accessor.updateProperty(keyBuilder.stateTransitionError(instanceName, sessionId,
updateSubPath, updateKey), new Error(record));
}
}
}