blob: ed7af75032c10a07309c70b48b9d814998a0aeea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.ha;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.cache.EventID;
/**
* Class identifying a Thread uniquely across the distributed system. It is composed of two fields
* 1) A byte array uniquely identifying the distributed system 2) A long value unqiuely identifying
* the thread in the distributed system
*
* The application thread while operating on the Region gets an EventID object ( contained in
* EntryEventImpl) This EventID object contains a ThreadLocal field which uniquely identifies the
* thread by storing the Object of this class.
*
* @see EventID
*
*
*/
public class ThreadIdentifier implements DataSerializable {
private static final long serialVersionUID = 3366884860834823186L;
private byte[] membershipID;
private long threadID;
public static final long MAX_THREAD_PER_CLIENT = 1000000L;
public static final int MAX_BUCKET_PER_PR = 1000;
public static final long WAN_BITS_MASK = 0xFFFFFFFF00000000L;
/**
* Generates thread ids for parallel wan usage.
*/
public enum WanType {
RESERVED, // original thread id incl putAll (or old format)
PRIMARY, // parallel new wan
SECONDARY, // parallel new wan
PARALLEL; // parallel old wan
/**
* Generates a new thread id for usage in a parallel wan context.
*
* @param threadId the original thread id
* @param offset the thread offset
* @param gatewayIndex the index of the gateway
* @return the new thread id
*/
public long generateWanId(long threadId, long offset, int gatewayIndex) {
assert this != RESERVED;
return Bits.WAN_TYPE.shift(ordinal()) | Bits.WAN.shift(offset)
| Bits.GATEWAY_ID.shift(gatewayIndex) | threadId;
}
/**
* Returns true if the supplied value is a wan thread identifier.
*
* @param tid the thread
* @return true if the thread id is one of the wan types
*/
public static boolean matches(long tid) {
return Bits.WAN_TYPE.extract(tid) > 0;
}
}
/**
* Provides type-safe bitwise access to the threadID when dealing with generated values for wan id
* generation.
*/
public enum Bits {
THREAD_ID(0, 32), // bits 0-31 thread id (including fake putAll bits)
WAN(32, 16), // bits 32-47 wan thread index (or bucket for new wan)
WAN_TYPE(48, 8), // bits 48-55 thread id type
GATEWAY_ID(56, 7), // bits 56-62 gateway id (bit 63 would make the thread id negative)
RESERVED(63, 1); // bit 63 unused
/** the beginning bit position */
private final int position;
/** the field width */
private final int width;
private Bits(int position, int width) {
this.position = position;
this.width = width;
}
/**
* Returns the field bitmask.
*
* @return the mask
*/
public long mask() {
return (1L << width) - 1;
}
/**
* Returns the value shifted into the field position.
*
* @param val the value to shift
* @return the shifted value
*/
public long shift(long val) {
assert val <= mask() : "Input value " + val + " is too large for " + this
+ " which has a maximum of " + mask();
return val << position;
}
/**
* Extracts the field bits from the value.
*
* @param val the value
* @return the field
*/
public long extract(long val) {
return (val >> position) & mask();
}
}
public ThreadIdentifier() {}
public ThreadIdentifier(final byte[] mid, long threadId) {
this.membershipID = mid;
this.threadID = threadId;
}
@Override
public boolean equals(Object obj) {
if ((obj == null) || !(obj instanceof ThreadIdentifier)) {
return false;
}
ThreadIdentifier other = (ThreadIdentifier) obj;
return (this.threadID == other.threadID
&& EventID.equalMembershipIds(this.membershipID, other.membershipID));
}
@Override
public int hashCode() {
final int mult = 37;
int result = EventID.hashCodeMemberId(membershipID);
result = mult * result + (int) this.threadID;
result = mult * result + (int) (this.threadID >>> 32);
return result;
}
public byte[] getMembershipID() {
return membershipID;
}
public long getThreadID() {
return threadID;
}
public static String toDisplayString(long tid) {
StringBuilder sb = new StringBuilder();
long lower = Bits.THREAD_ID.extract(tid);
if (lower != tid) {
sb.append("0x");
sb.append(Long.toHexString(tid >> Bits.THREAD_ID.width));
sb.append("|");
}
sb.append(lower);
return sb.toString();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ThreadId[");
sb.append("id=").append(membershipID.length).append("bytes; ");
sb.append(toDisplayString(threadID));
sb.append("]");
return sb.toString();
}
public String expensiveToString() {
Object mbr;
try {
mbr = InternalDistributedMember
.readEssentialData(new ByteArrayDataInput(membershipID));
} catch (Exception e) {
mbr = membershipID; // punt and use the bytes
}
return "ThreadId[" + mbr + "; thread " + toDisplayString(threadID) + "]";
}
/**
* convert fake thread id into real thread id
*
* @param tid thread id
* @return real thread id
*/
public static long getRealThreadID(long tid) {
return Bits.THREAD_ID.extract(tid) % MAX_THREAD_PER_CLIENT;
}
/**
* convert fake thread id into real thread id including WAN id
*
* @param tid thread id
* @return real thread id
*/
public static long getRealThreadIDIncludingWan(long tid) {
return getRealThreadID(tid) | (tid & WAN_BITS_MASK);
}
/**
* check if current thread id is a fake thread id for putAll
*
* @param tid thread id
* @return whether the thread id is fake
*/
public static boolean isPutAllFakeThreadID(long tid) {
return Bits.THREAD_ID.extract(tid) / MAX_THREAD_PER_CLIENT > 0;
}
/**
* check if current thread id is generated by ParallelWAN
*
* @param tid thread id
* @return whether the thread id is generated by ParallelGatewaySender
*/
public static boolean isParallelWANThreadID(long tid) {
return WanType.matches(tid) ? true : tid / MAX_THREAD_PER_CLIENT > (MAX_BUCKET_PER_PR + 2);
}
/**
* Checks if the input thread id is a WAN_TYPE thread id
*
* @return whether the input thread id is a WAN_TYPE thread id
*/
public static boolean isWanTypeThreadID(long tid) {
return WanType.matches(tid);
}
/**
* create a fake id for an operation on the given bucket
*
* @return the fake id
*/
public static long createFakeThreadIDForBulkOp(int bucketNumber, long originatingThreadId) {
return (MAX_THREAD_PER_CLIENT * (bucketNumber + 1) + originatingThreadId);
}
/**
* create a fake id for an operation on the given bucket
*
* @return the fake id
*/
public static long createFakeThreadIDForParallelGSPrimaryBucket(int bucketId,
long originatingThreadId, int gatewayIndex) {
return WanType.PRIMARY.generateWanId(originatingThreadId, bucketId, gatewayIndex);
}
/**
* create a fake id for an operation on the given bucket
*
* @return the fake id
*/
public static long createFakeThreadIDForParallelGSSecondaryBucket(int bucketId,
long originatingThreadId, int gatewayIndex) {
return WanType.SECONDARY.generateWanId(originatingThreadId, bucketId, gatewayIndex);
}
/**
* create a fake id for an operation on the given bucket
*
* @return the fake id
*/
public static long createFakeThreadIDForParallelGateway(int index, long originatingThreadId,
int gatewayIndex) {
return WanType.PARALLEL.generateWanId(originatingThreadId, index, gatewayIndex);
}
/**
* checks to see if the membership id of this identifier is the same as in the argument
*
* @return whether the two IDs are from the same member
*/
public boolean isSameMember(ThreadIdentifier other) {
return Arrays.equals(this.membershipID, other.membershipID);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
membershipID = DataSerializer.readByteArray(in);
threadID = in.readLong();
}
@Override
public void toData(DataOutput out) throws IOException {
DataSerializer.writeByteArray(membershipID, out);
out.writeLong(threadID);
}
}