blob: defdd6bb6549f60b3c99b0955b7d46996e0a17fe [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.StringReader;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ImmutableList;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.util.StatusUpdateUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED.
*
*/
public class DefaultSchedulerMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
public static final String WAIT_ALL = "WAIT_ALL";
public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
public static final String CONTROLLER_MSG_ID = "controllerMsgId";
public static final int TASKQUEUE_BUCKET_NUM = 10;
public static class SchedulerAsyncCallback extends AsyncCallback {
StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
Message _originalMessage;
HelixManager _manager;
final Map<String, Map<String, String>> _resultSummaryMap =
new ConcurrentHashMap<String, Map<String, String>>();
public SchedulerAsyncCallback(Message originalMessage, HelixManager manager) {
_originalMessage = originalMessage;
_manager = manager;
}
@Override
public void onTimeOut() {
_logger.info("Scheduler msg timeout " + _originalMessage.getMsgId() + " timout with "
+ _timeout + " Ms");
_statusUpdateUtil.logError(_originalMessage, SchedulerAsyncCallback.class, "Task timeout",
_manager);
addSummary(_resultSummaryMap, _originalMessage, _manager, true);
}
@Override
public void onReplyMessage(Message message) {
_logger.info("Update for scheduler msg " + _originalMessage.getMsgId() + " Message "
+ message.getMsgSrc() + " id " + message.getCorrelationId() + " completed");
String key = "MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID();
_resultSummaryMap.put(key, message.getResultMap());
if (this.isDone()) {
_logger.info("Scheduler msg " + _originalMessage.getMsgId() + " completed");
_statusUpdateUtil.logInfo(_originalMessage, SchedulerAsyncCallback.class,
"Scheduler task completed", _manager);
addSummary(_resultSummaryMap, _originalMessage, _manager, false);
}
}
private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
Message originalMessage, HelixManager manager, boolean timeOut) {
Map<String, String> summary = new TreeMap<String, String>();
summary.put("TotalMessages:", "" + _resultSummaryMap.size());
summary.put("Timeout", "" + timeOut);
_resultSummaryMap.put("Summary", summary);
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
originalMessage.getMsgId())).getRecord();
statusUpdate.getMapFields().putAll(_resultSummaryMap);
accessor.setProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
originalMessage.getMsgId()), new StatusUpdate(statusUpdate));
}
}
private static Logger _logger = LoggerFactory.getLogger(DefaultSchedulerMessageHandlerFactory.class);
HelixManager _manager;
public DefaultSchedulerMessageHandlerFactory(HelixManager manager) {
_manager = manager;
}
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ message.getMsgType());
}
return new DefaultSchedulerMessageHandler(message, context, _manager);
}
@Override
public String getMessageType() {
return MessageType.SCHEDULER_MSG.name();
}
@Override
public List<String> getMessageTypes() {
return ImmutableList.of(MessageType.SCHEDULER_MSG.name());
}
@Override
public void reset() {
}
public static class DefaultSchedulerMessageHandler extends MessageHandler {
HelixManager _manager;
public DefaultSchedulerMessageHandler(Message message, NotificationContext context,
HelixManager manager) {
super(message, context);
_manager = manager;
}
void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate,
String controllerMsgId) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
String clusterName = recipientCriteria.getClusterName();
if (clusterName != null && !clusterName.equals(_manager.getClusterName())) {
throw new HelixException(String.format(
"ScheduledTaskQueue cannot send message to another cluster. Local cluster name %s, remote cluster name %s.",
_manager.getClusterName(), clusterName));
}
Map<String, String> sendSummary = new HashMap<String, String>();
sendSummary.put("MessageCount", "0");
Map<InstanceType, List<Message>> messages =
_manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);
// Calculate tasks, and put them into the idealState of the SCHEDULER_TASK_QUEUE resource.
// List field are the destination node, while the Message parameters are stored in the
// mapFields
// task throttling can be done on SCHEDULER_TASK_QUEUE resource
if (messages.size() > 0) {
String taskQueueName = _message.getRecord().getSimpleField(SCHEDULER_TASK_QUEUE);
if (taskQueueName == null) {
throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE
+ " specified.");
}
IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
synchronized (_manager) {
int existingTopPartitionId = 0;
IdealState currentTaskQueue =
_manager.getHelixDataAccessor().getProperty(
accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
if (currentTaskQueue != null) {
existingTopPartitionId = findTopPartitionId(currentTaskQueue) + 1;
}
List<Message> taskMessages = (List<Message>) (messages.values().toArray()[0]);
for (Message task : taskMessages) {
String partitionId = taskQueueName + "_" + existingTopPartitionId;
existingTopPartitionId++;
String instanceName = task.getTgtName();
newAddedScheduledTasks.setPartitionState(partitionId, instanceName, "COMPLETED");
task.getRecord().setSimpleField(instanceName, "COMPLETED");
task.getRecord().setSimpleField(CONTROLLER_MSG_ID, controllerMsgId);
List<String> priorityList = new LinkedList<String>();
priorityList.add(instanceName);
newAddedScheduledTasks.getRecord().setListField(partitionId, priorityList);
newAddedScheduledTasks.getRecord().setMapField(partitionId,
task.getRecord().getSimpleFields());
_logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task "
+ partitionId + " " + task.getMsgId() + " to " + instanceName);
if (_logger.isDebugEnabled()) {
_logger.debug(task.getRecord().getSimpleFields().toString());
}
}
_manager.getHelixDataAccessor().updateProperty(
accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()),
newAddedScheduledTasks);
sendSummary.put("MessageCount", "" + taskMessages.size());
}
}
// Record the number of messages sent into scheduler message status updates
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
_message.getMsgId())).getRecord();
statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
_message.getMsgId()), new StatusUpdate(statusUpdate));
}
private int findTopPartitionId(IdealState currentTaskQueue) {
int topId = 0;
for (String partitionName : currentTaskQueue.getPartitionSet()) {
try {
String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
int num = Integer.parseInt(partitionNumStr);
if (topId < num) {
topId = num;
}
} catch (Exception e) {
_logger.error("", e);
}
}
return topId;
}
@Override
public HelixTaskResult handleMessage() throws InterruptedException {
String type = _message.getMsgType();
HelixTaskResult result = new HelixTaskResult();
if (!type.equals(MessageType.SCHEDULER_MSG.name())) {
throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
+ " type:" + _message.getMsgType());
}
// Parse timeout value
int timeOut = -1;
if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT")) {
try {
timeOut = Integer.parseInt(_message.getRecord().getSimpleFields().get("TIMEOUT"));
} catch (Exception e) {
}
}
// Parse the message template
ZNRecord record = new ZNRecord("templateMessage");
record.getSimpleFields().putAll(_message.getRecord().getMapField("MessageTemplate"));
Message messageTemplate = new Message(record);
// Parse the criteria
StringReader sr = new StringReader(_message.getRecord().getSimpleField("Criteria"));
ObjectMapper mapper = new ObjectMapper();
Criteria recipientCriteria;
try {
recipientCriteria = mapper.readValue(sr, Criteria.class);
} catch (Exception e) {
_logger.error("", e);
result.setException(e);
result.setSuccess(false);
return result;
}
_logger.info("Scheduler sending message, criteria:" + recipientCriteria);
boolean waitAll = false;
if (_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) != null) {
try {
waitAll =
Boolean.parseBoolean(_message.getRecord().getSimpleField(
DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
} catch (Exception e) {
_logger.warn("", e);
}
}
boolean hasSchedulerTaskQueue =
_message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
// If the target is PARTICIPANT, use the ScheduledTaskQueue
if (InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType()
&& hasSchedulerTaskQueue) {
handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate,
_message.getMsgId());
result.setSuccess(true);
result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
result.getTaskResultMap().put("ControllerResult",
"msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
return result;
}
_logger.info("Scheduler sending message to Controller");
int nMsgsSent = 0;
SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
if (waitAll) {
nMsgsSent =
_manager.getMessagingService().sendAndWait(recipientCriteria, messageTemplate,
callback, timeOut);
} else {
nMsgsSent =
_manager.getMessagingService().send(recipientCriteria, messageTemplate, callback,
timeOut);
}
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
// Record the number of messages sent into status updates
Map<String, String> sendSummary = new HashMap<String, String>();
sendSummary.put("MessageCount", "" + nMsgsSent);
ZNRecord statusUpdate =
accessor.getProperty(
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
_message.getMsgId())).getRecord();
statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
_message.getMsgId()), new StatusUpdate(statusUpdate));
result.getTaskResultMap().put("ControllerResult",
"msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
result.setSuccess(true);
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
_logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
}
}
}