| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.dht; |
| |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.nio.ByteBuffer; |
| import java.security.MessageDigest; |
| import java.util.*; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.cassandra.db.CachedHashDecoratedKey; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.IntegerType; |
| import org.apache.cassandra.db.marshal.PartitionerDefinedOrder; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.GuidGenerator; |
| import org.apache.cassandra.utils.ObjectSizes; |
| import org.apache.cassandra.utils.Pair; |
| |
| /** |
| * This class generates a BigIntegerToken using MD5 hash. |
| */ |
| public class RandomPartitioner implements IPartitioner |
| { |
| public static final BigInteger ZERO = new BigInteger("0"); |
| public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1"); |
| public static final BigInteger MAXIMUM = new BigInteger("2").pow(127); |
| |
| /** |
| * Maintain a separate threadlocal message digest, exclusively for token hashing. This is necessary because |
| * when Tracing is enabled and using the default tracing implementation, creating the mutations for the trace |
| * events involves tokenizing the partition keys. This happens multiple times whilst servicing a ReadCommand, |
| * and so can interfere with the stateful digest calculation if the node is a replica producing a digest response. |
| */ |
| private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>() |
| { |
| @Override |
| protected MessageDigest initialValue() |
| { |
| return FBUtilities.newMessageDigest("MD5"); |
| } |
| |
| @Override |
| public MessageDigest get() |
| { |
| MessageDigest digest = super.get(); |
| digest.reset(); |
| return digest; |
| } |
| }; |
| |
| private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(hashToBigInteger(ByteBuffer.allocate(1)))); |
| |
| public static final RandomPartitioner instance = new RandomPartitioner(); |
| public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance); |
| |
| private final Splitter splitter = new Splitter(this) |
| { |
| public Token tokenForValue(BigInteger value) |
| { |
| return new BigIntegerToken(value); |
| } |
| |
| public BigInteger valueForToken(Token token) |
| { |
| return ((BigIntegerToken)token).getTokenValue(); |
| } |
| }; |
| |
| public DecoratedKey decorateKey(ByteBuffer key) |
| { |
| return new CachedHashDecoratedKey(getToken(key), key); |
| } |
| |
| public Token midpoint(Token ltoken, Token rtoken) |
| { |
| // the symbolic MINIMUM token should act as ZERO: the empty bit array |
| BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token; |
| BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token; |
| Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127); |
| // discard the remainder |
| return new BigIntegerToken(midpair.left); |
| } |
| |
| public Token split(Token ltoken, Token rtoken, double ratioToLeft) |
| { |
| BigDecimal left = ltoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)ltoken).token), |
| right = rtoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)rtoken).token), |
| ratio = BigDecimal.valueOf(ratioToLeft); |
| |
| BigInteger newToken; |
| |
| if (left.compareTo(right) < 0) |
| { |
| newToken = right.subtract(left).multiply(ratio).add(left).toBigInteger(); |
| } |
| else |
| { |
| // wrapping case |
| // L + ((R - min) + (max - L)) * ratio |
| BigDecimal max = new BigDecimal(MAXIMUM); |
| |
| newToken = max.add(right).subtract(left).multiply(ratio).add(left).toBigInteger().mod(MAXIMUM); |
| } |
| |
| assert isValidToken(newToken) : "Invalid tokens from split"; |
| |
| return new BigIntegerToken(newToken); |
| } |
| |
| public BigIntegerToken getMinimumToken() |
| { |
| return MINIMUM; |
| } |
| |
| public BigIntegerToken getRandomToken() |
| { |
| BigInteger token = hashToBigInteger(GuidGenerator.guidAsBytes()); |
| if ( token.signum() == -1 ) |
| token = token.multiply(BigInteger.valueOf(-1L)); |
| return new BigIntegerToken(token); |
| } |
| |
| public BigIntegerToken getRandomToken(Random random) |
| { |
| BigInteger token = hashToBigInteger(GuidGenerator.guidAsBytes(random, "host/127.0.0.1", 0)); |
| if ( token.signum() == -1 ) |
| token = token.multiply(BigInteger.valueOf(-1L)); |
| return new BigIntegerToken(token); |
| } |
| |
| private boolean isValidToken(BigInteger token) { |
| return token.compareTo(ZERO) >= 0 && token.compareTo(MAXIMUM) <= 0; |
| } |
| |
| private final Token.TokenFactory tokenFactory = new Token.TokenFactory() |
| { |
| public ByteBuffer toByteArray(Token token) |
| { |
| BigIntegerToken bigIntegerToken = (BigIntegerToken) token; |
| return ByteBuffer.wrap(bigIntegerToken.token.toByteArray()); |
| } |
| |
| public Token fromByteArray(ByteBuffer bytes) |
| { |
| return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes))); |
| } |
| |
| public String toString(Token token) |
| { |
| BigIntegerToken bigIntegerToken = (BigIntegerToken) token; |
| return bigIntegerToken.token.toString(); |
| } |
| |
| public void validate(String token) throws ConfigurationException |
| { |
| try |
| { |
| if(!isValidToken(new BigInteger(token))) |
| throw new ConfigurationException("Token must be >= 0 and <= 2**127"); |
| } |
| catch (NumberFormatException e) |
| { |
| throw new ConfigurationException(e.getMessage()); |
| } |
| } |
| |
| public Token fromString(String string) |
| { |
| return new BigIntegerToken(new BigInteger(string)); |
| } |
| }; |
| |
| public Token.TokenFactory getTokenFactory() |
| { |
| return tokenFactory; |
| } |
| |
| public boolean preservesOrder() |
| { |
| return false; |
| } |
| |
| public static class BigIntegerToken extends ComparableObjectToken<BigInteger> |
| { |
| static final long serialVersionUID = -5833589141319293006L; |
| |
| public BigIntegerToken(BigInteger token) |
| { |
| super(token); |
| } |
| |
| // convenience method for testing |
| @VisibleForTesting |
| public BigIntegerToken(String token) |
| { |
| this(new BigInteger(token)); |
| } |
| |
| @Override |
| public IPartitioner getPartitioner() |
| { |
| return instance; |
| } |
| |
| @Override |
| public long getHeapSize() |
| { |
| return HEAP_SIZE; |
| } |
| |
| public Token increaseSlightly() |
| { |
| return new BigIntegerToken(token.add(BigInteger.ONE)); |
| } |
| |
| public double size(Token next) |
| { |
| BigIntegerToken n = (BigIntegerToken) next; |
| BigInteger v = n.token.subtract(token); // Overflow acceptable and desired. |
| double d = Math.scalb(v.doubleValue(), -127); // Scale so that the full range is 1. |
| return d > 0.0 ? d : (d + 1.0); // Adjust for signed long, also making sure t.size(t) == 1. |
| } |
| } |
| |
| public BigIntegerToken getToken(ByteBuffer key) |
| { |
| if (key.remaining() == 0) |
| return MINIMUM; |
| |
| return new BigIntegerToken(hashToBigInteger(key)); |
| } |
| |
| public Map<Token, Float> describeOwnership(List<Token> sortedTokens) |
| { |
| Map<Token, Float> ownerships = new HashMap<Token, Float>(); |
| Iterator<Token> i = sortedTokens.iterator(); |
| |
| // 0-case |
| if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. Has this node finished starting up?"); } |
| // 1-case |
| if (sortedTokens.size() == 1) |
| { |
| ownerships.put(i.next(), new Float(1.0)); |
| } |
| // n-case |
| else |
| { |
| // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers. |
| final BigInteger ri = MAXIMUM; // (used for addition later) |
| final BigDecimal r = new BigDecimal(ri); // The entire range, 2**127 |
| Token start = i.next(); BigInteger ti = ((BigIntegerToken)start).token; // The first token and its value |
| Token t; BigInteger tim1 = ti; // The last token and its value (after loop) |
| while (i.hasNext()) |
| { |
| t = i.next(); ti = ((BigIntegerToken)t).token; // The next token and its value |
| float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R |
| ownerships.put(t, x); // save (T(i) -> %age) |
| tim1 = ti; // -> advance loop |
| } |
| // The start token's range extends backward to the last token, which is why both were saved above. |
| float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue(); |
| ownerships.put(start, x); |
| } |
| return ownerships; |
| } |
| |
| public Token getMaximumToken() |
| { |
| return new BigIntegerToken(MAXIMUM); |
| } |
| |
| public AbstractType<?> getTokenValidator() |
| { |
| return IntegerType.instance; |
| } |
| |
| public AbstractType<?> partitionOrdering() |
| { |
| return partitionOrdering; |
| } |
| |
| public Optional<Splitter> splitter() |
| { |
| return Optional.of(splitter); |
| } |
| |
| private static BigInteger hashToBigInteger(ByteBuffer data) |
| { |
| MessageDigest messageDigest = localMD5Digest.get(); |
| if (data.hasArray()) |
| messageDigest.update(data.array(), data.arrayOffset() + data.position(), data.remaining()); |
| else |
| messageDigest.update(data.duplicate()); |
| |
| return new BigInteger(messageDigest.digest()).abs(); |
| } |
| } |