| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.rpc.cluster.loadbalance; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.io.Bytes; |
| import org.apache.dubbo.rpc.Invocation; |
| import org.apache.dubbo.rpc.Invoker; |
| import org.apache.dubbo.rpc.support.RpcUtils; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; |
| |
| /** |
| * ConsistentHashLoadBalance |
| */ |
| public class ConsistentHashLoadBalance extends AbstractLoadBalance { |
| public static final String NAME = "consistenthash"; |
| |
| /** |
| * Hash nodes name |
| */ |
| public static final String HASH_NODES = "hash.nodes"; |
| |
| /** |
| * Hash arguments name |
| */ |
| public static final String HASH_ARGUMENTS = "hash.arguments"; |
| |
| private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<>(); |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { |
| String methodName = RpcUtils.getMethodName(invocation); |
| String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; |
| // using the hashcode of list to compute the hash only pay attention to the elements in the list |
| int invokersHashCode = invokers.hashCode(); |
| ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); |
| if (selector == null || selector.identityHashCode != invokersHashCode) { |
| selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode)); |
| selector = (ConsistentHashSelector<T>) selectors.get(key); |
| } |
| return selector.select(invocation); |
| } |
| |
| private static final class ConsistentHashSelector<T> { |
| |
| private final TreeMap<Long, Invoker<T>> virtualInvokers; |
| |
| private final int replicaNumber; |
| |
| private final int identityHashCode; |
| |
| private final int[] argumentIndex; |
| |
| ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { |
| this.virtualInvokers = new TreeMap<>(); |
| this.identityHashCode = identityHashCode; |
| URL url = invokers.get(0).getUrl(); |
| this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); |
| String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); |
| argumentIndex = new int[index.length]; |
| for (int i = 0; i < index.length; i++) { |
| argumentIndex[i] = Integer.parseInt(index[i]); |
| } |
| for (Invoker<T> invoker : invokers) { |
| String address = invoker.getUrl().getAddress(); |
| for (int i = 0; i < replicaNumber / 4; i++) { |
| byte[] digest = Bytes.getMD5(address + i); |
| for (int h = 0; h < 4; h++) { |
| long m = hash(digest, h); |
| virtualInvokers.put(m, invoker); |
| } |
| } |
| } |
| } |
| |
| public Invoker<T> select(Invocation invocation) { |
| String key = toKey(RpcUtils.getArguments(invocation)); |
| |
| byte[] digest = Bytes.getMD5(key); |
| return selectForKey(hash(digest, 0)); |
| } |
| |
| private String toKey(Object[] args) { |
| StringBuilder buf = new StringBuilder(); |
| for (int i : argumentIndex) { |
| if (i >= 0 && args != null && i < args.length) { |
| buf.append(args[i]); |
| } |
| } |
| return buf.toString(); |
| } |
| |
| private Invoker<T> selectForKey(long hash) { |
| Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); |
| if (entry == null) { |
| entry = virtualInvokers.firstEntry(); |
| } |
| return entry.getValue(); |
| } |
| |
| private long hash(byte[] digest, int number) { |
| return (((long) (digest[3 + number * 4] & 0xFF) << 24) |
| | ((long) (digest[2 + number * 4] & 0xFF) << 16) |
| | ((long) (digest[1 + number * 4] & 0xFF) << 8) |
| | (digest[number * 4] & 0xFF)) |
| & 0xFFFFFFFFL; |
| } |
| } |
| } |