| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.ha; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Arrays; |
| |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.serialization.ByteArrayDataInput; |
| |
| /** |
| * Class identifying a Thread uniquely across the distributed system. It is composed of two fields |
| * 1) A byte array uniquely identifying the distributed system 2) A long value unqiuely identifying |
| * the thread in the distributed system |
| * |
| * The application thread while operating on the Region gets an EventID object ( contained in |
| * EntryEventImpl) This EventID object contains a ThreadLocal field which uniquely identifies the |
| * thread by storing the Object of this class. |
| * |
| * @see EventID |
| * |
| * |
| */ |
| |
| public class ThreadIdentifier implements DataSerializable { |
| private static final long serialVersionUID = 3366884860834823186L; |
| |
| private byte[] membershipID; |
| |
| private long threadID; |
| |
| public static final long MAX_THREAD_PER_CLIENT = 1000000L; |
| public static final int MAX_BUCKET_PER_PR = 1000; |
| public static final long WAN_BITS_MASK = 0xFFFFFFFF00000000L; |
| |
| /** |
| * Generates thread ids for parallel wan usage. |
| */ |
| public enum WanType { |
| RESERVED, // original thread id incl putAll (or old format) |
| PRIMARY, // parallel new wan |
| SECONDARY, // parallel new wan |
| PARALLEL; // parallel old wan |
| |
| /** |
| * Generates a new thread id for usage in a parallel wan context. |
| * |
| * @param threadId the original thread id |
| * @param offset the thread offset |
| * @param gatewayIndex the index of the gateway |
| * @return the new thread id |
| */ |
| public long generateWanId(long threadId, long offset, int gatewayIndex) { |
| assert this != RESERVED; |
| return Bits.WAN_TYPE.shift(ordinal()) | Bits.WAN.shift(offset) |
| | Bits.GATEWAY_ID.shift(gatewayIndex) | threadId; |
| } |
| |
| /** |
| * Returns true if the supplied value is a wan thread identifier. |
| * |
| * @param tid the thread |
| * @return true if the thread id is one of the wan types |
| */ |
| public static boolean matches(long tid) { |
| return Bits.WAN_TYPE.extract(tid) > 0; |
| } |
| } |
| |
| /** |
| * Provides type-safe bitwise access to the threadID when dealing with generated values for wan id |
| * generation. |
| */ |
| public enum Bits { |
| THREAD_ID(0, 32), // bits 0-31 thread id (including fake putAll bits) |
| WAN(32, 16), // bits 32-47 wan thread index (or bucket for new wan) |
| WAN_TYPE(48, 8), // bits 48-55 thread id type |
| GATEWAY_ID(56, 7), // bits 56-62 gateway id (bit 63 would make the thread id negative) |
| RESERVED(63, 1); // bit 63 unused |
| |
| /** the beginning bit position */ |
| private final int position; |
| |
| /** the field width */ |
| private final int width; |
| |
| private Bits(int position, int width) { |
| this.position = position; |
| this.width = width; |
| } |
| |
| /** |
| * Returns the field bitmask. |
| * |
| * @return the mask |
| */ |
| public long mask() { |
| return (1L << width) - 1; |
| } |
| |
| /** |
| * Returns the value shifted into the field position. |
| * |
| * @param val the value to shift |
| * @return the shifted value |
| */ |
| public long shift(long val) { |
| assert val <= mask() : "Input value " + val + " is too large for " + this |
| + " which has a maximum of " + mask(); |
| return val << position; |
| } |
| |
| /** |
| * Extracts the field bits from the value. |
| * |
| * @param val the value |
| * @return the field |
| */ |
| public long extract(long val) { |
| return (val >> position) & mask(); |
| } |
| } |
| |
| public ThreadIdentifier() {} |
| |
| public ThreadIdentifier(final byte[] mid, long threadId) { |
| this.membershipID = mid; |
| this.threadID = threadId; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if ((obj == null) || !(obj instanceof ThreadIdentifier)) { |
| return false; |
| } |
| ThreadIdentifier other = (ThreadIdentifier) obj; |
| return (this.threadID == other.threadID |
| && EventID.equalMembershipIds(this.membershipID, other.membershipID)); |
| } |
| |
| @Override |
| public int hashCode() { |
| final int mult = 37; |
| |
| int result = EventID.hashCodeMemberId(membershipID); |
| result = mult * result + (int) this.threadID; |
| result = mult * result + (int) (this.threadID >>> 32); |
| |
| return result; |
| } |
| |
| public byte[] getMembershipID() { |
| return membershipID; |
| } |
| |
| public long getThreadID() { |
| return threadID; |
| } |
| |
| public static String toDisplayString(long tid) { |
| StringBuilder sb = new StringBuilder(); |
| long lower = Bits.THREAD_ID.extract(tid); |
| if (lower != tid) { |
| sb.append("0x"); |
| sb.append(Long.toHexString(tid >> Bits.THREAD_ID.width)); |
| sb.append("|"); |
| } |
| sb.append(lower); |
| |
| return sb.toString(); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append("ThreadId["); |
| sb.append("id=").append(membershipID.length).append("bytes; "); |
| sb.append(toDisplayString(threadID)); |
| sb.append("]"); |
| |
| return sb.toString(); |
| } |
| |
| public String expensiveToString() { |
| Object mbr; |
| try { |
| mbr = InternalDistributedMember |
| .readEssentialData(new ByteArrayDataInput(membershipID)); |
| } catch (Exception e) { |
| mbr = membershipID; // punt and use the bytes |
| } |
| |
| return "ThreadId[" + mbr + "; thread " + toDisplayString(threadID) + "]"; |
| } |
| |
| /** |
| * convert fake thread id into real thread id |
| * |
| * @param tid thread id |
| * @return real thread id |
| */ |
| public static long getRealThreadID(long tid) { |
| return Bits.THREAD_ID.extract(tid) % MAX_THREAD_PER_CLIENT; |
| } |
| |
| /** |
| * convert fake thread id into real thread id including WAN id |
| * |
| * @param tid thread id |
| * @return real thread id |
| */ |
| public static long getRealThreadIDIncludingWan(long tid) { |
| return getRealThreadID(tid) | (tid & WAN_BITS_MASK); |
| } |
| |
| /** |
| * check if current thread id is a fake thread id for putAll |
| * |
| * @param tid thread id |
| * @return whether the thread id is fake |
| */ |
| public static boolean isPutAllFakeThreadID(long tid) { |
| return Bits.THREAD_ID.extract(tid) / MAX_THREAD_PER_CLIENT > 0; |
| } |
| |
| /** |
| * check if current thread id is generated by ParallelWAN |
| * |
| * @param tid thread id |
| * @return whether the thread id is generated by ParallelGatewaySender |
| */ |
| public static boolean isParallelWANThreadID(long tid) { |
| return WanType.matches(tid) ? true : tid / MAX_THREAD_PER_CLIENT > (MAX_BUCKET_PER_PR + 2); |
| } |
| |
| /** |
| * Checks if the input thread id is a WAN_TYPE thread id |
| * |
| * @return whether the input thread id is a WAN_TYPE thread id |
| */ |
| public static boolean isWanTypeThreadID(long tid) { |
| return WanType.matches(tid); |
| } |
| |
| /** |
| * create a fake id for an operation on the given bucket |
| * |
| * @return the fake id |
| */ |
| public static long createFakeThreadIDForBulkOp(int bucketNumber, long originatingThreadId) { |
| return (MAX_THREAD_PER_CLIENT * (bucketNumber + 1) + originatingThreadId); |
| } |
| |
| /** |
| * create a fake id for an operation on the given bucket |
| * |
| * @return the fake id |
| */ |
| public static long createFakeThreadIDForParallelGSPrimaryBucket(int bucketId, |
| long originatingThreadId, int gatewayIndex) { |
| return WanType.PRIMARY.generateWanId(originatingThreadId, bucketId, gatewayIndex); |
| } |
| |
| /** |
| * create a fake id for an operation on the given bucket |
| * |
| * @return the fake id |
| */ |
| public static long createFakeThreadIDForParallelGSSecondaryBucket(int bucketId, |
| long originatingThreadId, int gatewayIndex) { |
| return WanType.SECONDARY.generateWanId(originatingThreadId, bucketId, gatewayIndex); |
| } |
| |
| /** |
| * create a fake id for an operation on the given bucket |
| * |
| * @return the fake id |
| */ |
| public static long createFakeThreadIDForParallelGateway(int index, long originatingThreadId, |
| int gatewayIndex) { |
| return WanType.PARALLEL.generateWanId(originatingThreadId, index, gatewayIndex); |
| } |
| |
| /** |
| * checks to see if the membership id of this identifier is the same as in the argument |
| * |
| * @return whether the two IDs are from the same member |
| */ |
| public boolean isSameMember(ThreadIdentifier other) { |
| return Arrays.equals(this.membershipID, other.membershipID); |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| membershipID = DataSerializer.readByteArray(in); |
| threadID = in.readLong(); |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| DataSerializer.writeByteArray(membershipID, out); |
| out.writeLong(threadID); |
| } |
| } |