blob: fab07e4a8d7a88983d181e8a0ecc660371c68e66 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class MessageDispatchStage extends AbstractBaseStage {
private static Logger logger = LoggerFactory.getLogger(MessageDispatchStage.class);
protected void processEvent(ClusterEvent event, MessageOutput messageOutput) throws Exception {
_eventId = event.getEventId();
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
if (manager == null || resourceMap == null || messageOutput == null || cache == null
|| liveInstanceMap == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<>();
for (String resourceName : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceName);
for (Partition partition : resource.getPartitions()) {
List<Message> messages = messageOutput.getMessages(resourceName, partition);
if (messages == null || messages.isEmpty()) {
messages = Collections.emptyList();
}
messagesToSend.addAll(messages);
}
}
List<Message> outputMessages =
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
manager.getProperties());
// Only expect tests' events don't have EVENT_SESSION, while all events in prod should have it.
if (!event.containsAttribute(AttributeName.EVENT_SESSION.name())) {
logger.info("Event {} does not have event session attribute", event.getEventId());
} else {
// An early check for expected leader session. If the sessions don't match, it means the
// controller's session changes, then messages should not be sent and pipeline should stop.
Optional<String> expectedSession = event.getAttribute(AttributeName.EVENT_SESSION.name());
if (!expectedSession.isPresent() || !expectedSession.get().equals(manager.getSessionId())) {
throw new StageException(String.format(
"Event session doesn't match controller %s session! Expected session: %s, actual: %s",
manager.getInstanceName(), expectedSession.orElse("NOT_PRESENT"), manager.getSessionId()));
}
}
List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
// TODO: Need also count messages from task rebalancer
if (!(cache instanceof WorkflowControllerDataProvider)) {
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (clusterStatusMonitor != null) {
clusterStatusMonitor.increaseMessageReceived(outputMessages);
}
}
long cacheStart = System.currentTimeMillis();
cache.cacheMessages(messagesSent);
long cacheEnd = System.currentTimeMillis();
LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - cacheStart) + " ms");
}
List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
HelixManagerProperties properties) {
// group messages by its CurrentState path + "/" + fromState + "/" + toState
Map<String, Message> batchMessages = new HashMap<String, Message>();
List<Message> outputMessages = new ArrayList<Message>();
Iterator<Message> iter = messages.iterator();
while (iter.hasNext()) {
Message message = iter.next();
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
String instanceName = message.getTgtName();
LiveInstance liveInstance = liveInstanceMap.get(instanceName);
String participantVersion = null;
if (liveInstance != null) {
participantVersion = liveInstance.getHelixVersion();
}
if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
|| !properties.isFeatureSupported("batch_message", participantVersion)) {
outputMessages.add(message);
continue;
}
String key =
keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId(),
message.getResourceName()).getPath()
+ "/" + message.getFromState() + "/" + message.getToState();
if (!batchMessages.containsKey(key)) {
Message batchMessage = new Message(message.getRecord());
batchMessage.setBatchMessageMode(true);
outputMessages.add(batchMessage);
batchMessages.put(key, batchMessage);
}
batchMessages.get(key).addPartitionName(message.getPartitionName());
}
return outputMessages;
}
// return the messages actually sent
protected List<Message> sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
List<Message> messageSent = new ArrayList<>();
if (messages == null || messages.isEmpty()) {
return messageSent;
}
Builder keyBuilder = dataAccessor.keyBuilder();
List<PropertyKey> keys = new ArrayList<PropertyKey>();
for (Message message : messages) {
LogUtil.logInfo(
logger, _eventId,
"Sending Message " + message.getMsgId() + " to " + message.getTgtName() + " transit "
+ message.getResourceName() + "." + message.getPartitionName() + "|" + message
.getPartitionNames() + " from:" + message.getFromState() + " to:" + message
.getToState() + ", relayMessages: " + message.getRelayMessages().size());
if (message.hasRelayMessages()) {
for (Message msg : message.getRelayMessages().values()) {
LogUtil.logInfo(logger, _eventId,
"Sending Relay Message " + msg.getMsgId() + " to " + msg.getTgtName() + " transit "
+ msg.getResourceName() + "." + msg.getPartitionName() + "|" + msg
.getPartitionNames() + " from:" + msg.getFromState() + " to:" + msg.getToState()
+ ", relayFrom: " + msg.getRelaySrcHost() + ", attached to message: " + message
.getMsgId());
}
}
keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
}
boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages));
for (int i = 0; i < results.length; i++) {
if (!results[i]) {
LogUtil.logError(logger, _eventId, "Failed to send message: " + keys.get(i));
} else {
messageSent.add(messages.get(i));
}
}
return messageSent;
}
}