blob: 49fa841bc753e9e39204000c3fe6328f6549ba49 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageThrottleStage extends AbstractBaseStage {
private static final Logger LOG = LoggerFactory.getLogger(MessageThrottleStage.class.getName());
int valueOf(String valueStr) {
int value = Integer.MAX_VALUE;
try {
ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
switch (valueToken) {
case ANY:
value = Integer.MAX_VALUE;
break;
default:
LogUtil.logError(LOG, _eventId,
"Invalid constraintValue token:" + valueStr + ". Use default value:"
+ Integer.MAX_VALUE);
break;
}
} catch (Exception e) {
try {
value = Integer.parseInt(valueStr);
} catch (NumberFormatException ne) {
LogUtil.logError(LOG, _eventId,
"Invalid constraintValue string:" + valueStr + ". Use default value:"
+ Integer.MAX_VALUE);
}
}
return value;
}
/**
* constraints are selected in the order of the following rules: 1) don't select
* constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
* other, select the most specific one 3) if a message matches multiple constraints of
* incomparable specificity, select the one with the minimum value 4) if a message
* matches multiple constraints of incomparable specificity, and they all have the same
* value, select the first in alphabetic order
*/
Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
Map<ConstraintAttribute, String> attributes) {
Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
for (ConstraintItem item : items) {
// don't select constraints with CONSTRAINT_VALUE=ANY
if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
continue;
}
String key = item.filter(attributes).toString();
if (!selectedItems.containsKey(key)) {
selectedItems.put(key, item);
} else {
ConstraintItem existingItem = selectedItems.get(key);
if (existingItem.match(item.getAttributes())) {
// item is more specific than existingItem
selectedItems.put(key, item);
} else if (!item.match(existingItem.getAttributes())) {
// existingItem and item are of incomparable specificity
int value = valueOf(item.getConstraintValue());
int existingValue = valueOf(existingItem.getConstraintValue());
if (value < existingValue) {
// item's constraint value is less than that of existingItem
selectedItems.put(key, item);
} else if (value == existingValue) {
if (item.toString().compareTo(existingItem.toString()) < 0) {
// item is ahead of existingItem in alphabetic order
selectedItems.put(key, item);
}
}
}
}
}
return new HashSet<>(selectedItems.values());
}
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
MessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null || msgSelectionOutput == null) {
throw new StageException("Missing attributes in event: " + event
+ ". Requires ResourceControllerDataProvider|RESOURCES|MESSAGES_SELECTED");
}
MessageOutput output = new MessageOutput();
ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
if (constraint != null) {
// go through all pending messages, they should be counted but not throttled
for (String instance : cache.getLiveInstances().keySet()) {
throttle(throttleCounterMap, constraint, new ArrayList<>(cache.getMessages(instance)
.values()), false);
}
}
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
for (String resourceName : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceName);
for (Partition partition : resource.getPartitions()) {
List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
if (constraint != null && messages != null && messages.size() > 0) {
messages = throttle(throttleCounterMap, constraint, messages, true);
}
output.addMessages(resourceName, partition, messages);
}
}
event.addAttribute(AttributeName.MESSAGES_THROTTLE.name(), output);
}
private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
List<Message> messages, final boolean needThrottle) {
List<Message> throttleOutputMsgs = new ArrayList<Message>();
for (Message message : messages) {
Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
Set<ConstraintItem> matches = constraint.match(msgAttr);
matches = selectConstraints(matches, msgAttr);
boolean msgThrottled = false;
for (ConstraintItem item : matches) {
String key = item.filter(msgAttr).toString();
if (!throttleMap.containsKey(key)) {
throttleMap.put(key, valueOf(item.getConstraintValue()));
}
int value = throttleMap.get(key);
throttleMap.put(key, --value);
if (needThrottle && value < 0) {
msgThrottled = true;
if (LOG.isDebugEnabled()) {
// TODO: printout constraint item that throttles the message
LogUtil.logDebug(LOG, _eventId,
"message: " + message + " is throttled by constraint: " + item);
}
}
}
if (!msgThrottled) {
throttleOutputMsgs.add(message);
}
}
return throttleOutputMsgs;
}
}