| package org.apache.helix.messaging.handling; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.NotificationContext; |
| import org.apache.helix.NotificationContext.MapKey; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.model.CurrentState; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.Message.Attributes; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class BatchMessageHandler extends MessageHandler { |
| private static Logger LOG = LoggerFactory.getLogger(BatchMessageHandler.class); |
| |
| final MessageHandlerFactory _msgHandlerFty; |
| final TaskExecutor _executor; |
| final List<Message> _subMessages; |
| final List<MessageHandler> _subMessageHandlers; |
| final BatchMessageWrapper _batchMsgWrapper; |
| |
| public BatchMessageHandler(Message msg, NotificationContext context, MessageHandlerFactory fty, |
| BatchMessageWrapper wrapper, TaskExecutor executor) { |
| super(msg, context); |
| |
| if (fty == null || executor == null) { |
| throw new HelixException("MessageHandlerFactory | TaskExecutor can't be null"); |
| } |
| |
| _msgHandlerFty = fty; |
| _batchMsgWrapper = wrapper; |
| _executor = executor; |
| |
| // create sub-messages |
| _subMessages = new ArrayList<Message>(); |
| List<String> partitionKeys = _message.getPartitionNames(); |
| for (String partitionKey : partitionKeys) { |
| // assign a new message id, put batch-msg-id to parent-id field |
| Message subMsg = new Message(_message.getRecord(), UUID.randomUUID().toString()); |
| subMsg.setPartitionName(partitionKey); |
| subMsg.setAttribute(Attributes.PARENT_MSG_ID, _message.getId()); |
| subMsg.setBatchMessageMode(false); |
| |
| _subMessages.add(subMsg); |
| } |
| |
| // create sub-message handlers |
| _subMessageHandlers = createMsgHandlers(_subMessages, context); |
| } |
| |
| List<MessageHandler> createMsgHandlers(List<Message> msgs, NotificationContext context) { |
| |
| List<MessageHandler> handlers = new ArrayList<MessageHandler>(); |
| for (Message msg : msgs) { |
| MessageHandler handler = _msgHandlerFty.createHandler(msg, context); |
| handlers.add(handler); |
| } |
| return handlers; |
| } |
| |
| public void preHandleMessage() { |
| if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) { |
| _batchMsgWrapper.start(_message, _notificationContext); |
| } |
| } |
| |
| public void postHandleMessage() { |
| if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) { |
| _batchMsgWrapper.end(_message, _notificationContext); |
| } |
| |
| // update currentState |
| HelixManager manager = _notificationContext.getManager(); |
| HelixDataAccessor accessor = manager.getHelixDataAccessor(); |
| ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap = |
| (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext |
| .get(MapKey.CURRENT_STATE_UPDATE.toString()); |
| |
| if (csUpdateMap != null) { |
| Map<PropertyKey, CurrentState> csUpdate = mergeCurStateUpdate(csUpdateMap); |
| |
| // TODO: change to use asyncSet |
| for (PropertyKey key : csUpdate.keySet()) { |
| // logger.info("updateCS: " + key); |
| // System.out.println("\tupdateCS: " + key.getPath() + ", " + |
| // curStateMap.get(key)); |
| if(!accessor.updateProperty(key, csUpdate.get(key))) { |
| LOG.error( |
| "Fails to persist current state to ZK for key " + key); |
| } |
| } |
| } |
| } |
| |
| // will not return until all sub-message executions are done |
| @Override |
| public HelixTaskResult handleMessage() { |
| HelixTaskResult result = null; |
| List<Future<HelixTaskResult>> futures = null; |
| List<MessageTask> batchTasks = new ArrayList<MessageTask>(); |
| |
| synchronized (_batchMsgWrapper) { |
| try { |
| preHandleMessage(); |
| |
| int exeBatchSize = 1; // TODO: getExeBatchSize from msg |
| List<String> partitionKeys = _message.getPartitionNames(); |
| for (int i = 0; i < partitionKeys.size(); i += exeBatchSize) { |
| if (i + exeBatchSize <= partitionKeys.size()) { |
| List<Message> msgs = _subMessages.subList(i, i + exeBatchSize); |
| List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + exeBatchSize); |
| HelixBatchMessageTask batchTask = |
| new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext); |
| batchTasks.add(batchTask); |
| |
| } else { |
| List<Message> msgs = _subMessages.subList(i, i + partitionKeys.size()); |
| List<MessageHandler> handlers = |
| _subMessageHandlers.subList(i, i + partitionKeys.size()); |
| |
| HelixBatchMessageTask batchTask = |
| new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext); |
| batchTasks.add(batchTask); |
| } |
| } |
| |
| // invokeAll() is blocking call |
| long timeout = _message.getExecutionTimeout(); |
| if (timeout == -1) { |
| timeout = Long.MAX_VALUE; |
| } |
| futures = _executor.invokeAllTasks(batchTasks, timeout, TimeUnit.MILLISECONDS); |
| } catch (Exception e) { |
| LOG.error("fail to execute batchMsg: " + _message.getId(), e); |
| result = new HelixTaskResult(); |
| result.setException(e); |
| |
| // HelixTask will call onError on this batch-msg-handler |
| // return result; |
| } |
| |
| // combine sub-results to result |
| if (futures != null) { |
| boolean isBatchTaskSucceed = true; |
| |
| for (int i = 0; i < futures.size(); i++) { |
| Future<HelixTaskResult> future = futures.get(i); |
| MessageTask subTask = batchTasks.get(i); |
| try { |
| HelixTaskResult subTaskResult = future.get(); |
| if (!subTaskResult.isSuccess()) { |
| isBatchTaskSucceed = false; |
| } |
| } catch (InterruptedException e) { |
| isBatchTaskSucceed = false; |
| LOG.error("interrupted in executing batch-msg: " + _message.getId() + ", sub-msg: " |
| + subTask.getTaskId(), e); |
| } catch (ExecutionException e) { |
| isBatchTaskSucceed = false; |
| LOG.error( |
| "fail to execute batch-msg: " + _message.getId() + ", sub-msg: " |
| + subTask.getTaskId(), e); |
| } |
| } |
| result = new HelixTaskResult(); |
| result.setSuccess(isBatchTaskSucceed); |
| } |
| |
| result.setCompleteTime(System.currentTimeMillis()); |
| // pass task-result to post-handle-msg |
| _notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result); |
| postHandleMessage(); |
| |
| return result; |
| } |
| } |
| |
| @Override |
| public void onError(Exception e, ErrorCode code, ErrorType type) { |
| // if one sub-message execution fails, call onError on all sub-message handlers |
| for (MessageHandler handler : _subMessageHandlers) { |
| handler.onError(e, code, type); |
| } |
| } |
| |
| // TODO: optimize this based on the fact that each cs update is for a |
| // distinct partition |
| private Map<PropertyKey, CurrentState> mergeCurStateUpdate( |
| ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap) { |
| Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>(); |
| for (CurrentStateUpdate update : csUpdateMap.values()) { |
| String path = update._key.getPath(); // TODO: this is time |
| // consuming, optimize it |
| if (!curStateUpdateMap.containsKey(path)) { |
| curStateUpdateMap.put(path, update); |
| } else { |
| // long start = System.currentTimeMillis(); |
| curStateUpdateMap.get(path).merge(update._delta); |
| // long end = System.currentTimeMillis(); |
| // LOG.info("each merge took: " + (end - start)); |
| } |
| } |
| |
| Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>(); |
| for (CurrentStateUpdate update : curStateUpdateMap.values()) { |
| ret.put(update._key, update._delta); |
| } |
| |
| return ret; |
| } |
| |
| } |