blob: 4c46106cce7aa84a32d117ebaa5819e0763147e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* This class represents a ConnectionProxy of the CacheClient
*
*
*
*/
public class ClientProxyMembershipID
implements DataSerializableFixedID, Serializable, Externalizable {
public static final long serialVersionUID = 7144300815346556370L;
private static final Logger logger = LogService.getLogger();
private static final ThreadLocal<String> POOL_NAME = new ThreadLocal<String>();
public static void setPoolName(String poolName) {
POOL_NAME.set(poolName);
}
public static String getPoolName() {
return POOL_NAME.get();
}
private static final int BYTES_32KB = 32768;
@MakeNotStatic
public static volatile DistributedSystem system = null;
/**
* the membership id of the distributed system in this client (if running in a client)
*/
@MakeNotStatic
public static DistributedMember systemMemberId;
// durable_synch_counter=1 is reserved for durable clients
// so that when pools are being created and deleted the same client
// session is selected on the serverside by always using the
// same uniqueID value which is set via the synch_counter
private static final int durable_synch_counter = 1;
@MakeNotStatic
private static int synch_counter = 0;
protected byte[] identity;
/** cached membership identifier */
private transient DistributedMember memberId;
/** cached tostring of the memberID */
private transient String memberIdString;
protected int uniqueId;
@Override
public int hashCode() {
int result = 17;
final int mult = 37;
if (isDurable()) {
result = mult * result + getDurableId().hashCode();
} else {
if (this.identity != null && this.identity.length > 0) {
for (int i = 0; i < this.identity.length; i++) {
result = mult * result + this.identity[i];
}
}
}
// we can't use unique_id in hashCode
// because of HandShake's hashCode using our HashCode but
// its equals using our isSameDSMember which ignores unique_id
// result = mult * result + this.unique_id;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if ((obj == null) || !(obj instanceof ClientProxyMembershipID)) {
return false;
}
ClientProxyMembershipID that = (ClientProxyMembershipID) obj;
if (this.uniqueId != that.uniqueId) {
return false;
}
boolean isDurable = this.isDurable();
if (isDurable && !that.isDurable()) {
return false;
}
if (isDurable) {
return this.getDurableId().equals(that.getDurableId());
}
return Arrays.equals(this.identity, that.identity);
}
/**
* Return true if "that" can be used in place of "this" when canonicalizing.
*/
private boolean isCanonicalEquals(ClientProxyMembershipID that) {
if (this == that) {
return true;
}
if (this.uniqueId != that.uniqueId) {
return false;
}
return Arrays.equals(this.identity, that.identity);
}
boolean isSameDSMember(ClientProxyMembershipID that) {
if (that != null) {
// Test whether:
// - the durable ids are equal (if durable) or
// - the identities are equal (if non-durable)
return isDurable() ? this.getDurableId().equals(that.getDurableId())
: Arrays.equals(this.identity, that.identity);
} else {
return false;
}
}
/** method to obtain ClientProxyMembership for client side */
public static synchronized ClientProxyMembershipID getNewProxyMembership(DistributedSystem sys) {
byte[] ba = initializeAndGetDSIdentity(sys);
return new ClientProxyMembershipID(++synch_counter, ba);
}
public static ClientProxyMembershipID getClientId(DistributedMember member) {
return new ClientProxyMembershipID(member);
}
public static byte[] initializeAndGetDSIdentity(DistributedSystem sys) {
byte[] client_side_identity = null;
if (sys == null) {
// DistributedSystem is required now before handshaking -Kirk
throw new IllegalStateException(
"Attempting to handshake with CacheServer before creating DistributedSystem and Cache.");
}
{
systemMemberId = sys.getDistributedMember();
try {
if (systemMemberId != null) {
// update the durable id of the member identifier before serializing in case
// a pool name has been established
DurableClientAttributes attributes = systemMemberId.getDurableClientAttributes();
if (attributes != null && attributes.getId().length() > 0) {
((InternalDistributedMember) systemMemberId).setDurableId(attributes.getId());
}
}
HeapDataOutputStream hdos = new HeapDataOutputStream(256, Version.CURRENT);
DataSerializer.writeObject(systemMemberId, hdos);
client_side_identity = hdos.toByteArray();
} catch (IOException ioe) {
throw new InternalGemFireException(
"Unable to serialize identity",
ioe);
}
system = sys;
}
return client_side_identity;
}
private ClientProxyMembershipID(int id, byte[] clientSideIdentity) {
boolean specialCase = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "SPECIAL_DURABLE");
String durableID = this.system.getProperties().getProperty(DURABLE_CLIENT_ID);
if (specialCase && durableID != null && (!durableID.equals(""))) {
this.uniqueId = durable_synch_counter;
} else {
this.uniqueId = id;
}
this.identity = clientSideIdentity;
this.memberId = systemMemberId;
}
public ClientProxyMembershipID() {}
public ClientProxyMembershipID(DistributedMember member) {
this.uniqueId = 1;
this.memberId = member;
updateID(member);
}
private transient String _toString;
// private transient int transientPort; // variable for debugging member ID issues
@Override
public String toString() {
if (this.identity != null
&& ((InternalDistributedMember) getDistributedMember()).getMembershipPort() == 0) {
return this.toStringNoCache();
}
if (this._toString == null) {
this._toString = this.toStringNoCache();
}
return this._toString;
}
/**
* returns a string representation of this identifier, ignoring the toString cache
*/
public String toStringNoCache() {
StringBuffer sb = new StringBuffer("identity(").append(getDSMembership()).append(",connection=")
.append(uniqueId);
if (identity != null) {
DurableClientAttributes dca = getDurableAttributes();
if (dca.getId().length() > 0) {
sb.append(",durableAttributes=").append(dca).append(')').toString();
}
}
return sb.toString();
}
/**
* For Externalizable
*
* @see Externalizable
*/
@Override
public void writeExternal(ObjectOutput out) throws IOException {
// if (this.transientPort == 0) {
// InternalDistributedSystem.getLogger().warning(
// String.format("%s",
// "externalizing a client ID with zero port: " + this.toString(),
// new Exception("Stack trace")));
// }
Assert.assertTrue(this.identity.length <= BYTES_32KB);
out.writeShort(this.identity.length);
out.write(this.identity);
out.writeInt(this.uniqueId);
}
/** returns the externalized size of this object */
public int getSerializedSize() {
return 4 + identity.length + 4;
}
/**
* For Externalizable
*
* @see Externalizable
*/
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
int identityLength = in.readShort();
if (identityLength > BYTES_32KB) {
throw new IOException(
"HandShake identity length is too big");
}
this.identity = new byte[identityLength];
read(in, this.identity);
this.uniqueId = in.readInt();
if (this.uniqueId == -1) {
throw new IOException(
"Unexpected EOF reached. Unique ID could not be read");
}
// {toString(); this.transientPort = ((InternalDistributedMember)this.memberId).getPort();}
}
private void read(ObjectInput dis, byte[] toFill) throws IOException {
int idBytes = 0;
int toFillLength = toFill.length;
while (idBytes < toFillLength) {
// idBytes += dis.read(toFill, idBytes, (toFillLength - idBytes));
int dataRead = dis.read(toFill, idBytes, (toFillLength - idBytes));
if (dataRead == -1) {
throw new IOException(
"Unexpected EOF reached. Distributed MembershipID could not be read");
}
idBytes += dataRead;
}
}
@Override
public int getDSFID() {
return CLIENT_PROXY_MEMBERSHIPID;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
// if (this.transientPort == 0) {
// InternalDistributedSystem.getLogger().warning(
// String.format("%s",
// "serializing a client ID with zero port: " + this.toString(),
// new Exception("Stack trace")));
// }
DataSerializer.writeByteArray(this.identity, out);
out.writeInt(this.uniqueId);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
this.identity = DataSerializer.readByteArray(in);
this.uniqueId = in.readInt();
// {toString(); this.transientPort = ((InternalDistributedMember)this.memberId).getPort();}
}
public Version getClientVersion() {
return ((InternalDistributedMember) getDistributedMember()).getVersionObject();
}
public String getDSMembership() {
if (identity == null) {
// some unit tests create IDs that have no real identity, so return null
return "null";
}
// don't cache if we haven't connected to the server yet
if (((InternalDistributedMember) getDistributedMember()).getMembershipPort() == 0) {
return getMemberIdAsString();
}
if (memberIdString == null) {
memberIdString = getMemberIdAsString();
}
return memberIdString;
}
private String getMemberIdAsString() {
String memberIdAsString = null;
InternalDistributedMember idm = (InternalDistributedMember) getDistributedMember();
if (getClientVersion().compareTo(Version.GFE_90) < 0) {
memberIdAsString = idm.toString();
} else {
StringBuilder sb = new StringBuilder();
idm.addFixedToString(sb);
memberIdAsString = sb.toString();
}
return memberIdAsString;
}
/**
* this method uses CacheClientNotifier to try to obtain an ID that is equal to this one. This is
* used during deserialization to reduce storage overhead.
*/
private ClientProxyMembershipID canonicalReference() {
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
if (ccn != null) {
CacheClientProxy cp = ccn.getClientProxy(this, true);
if (cp != null) {
if (this.isCanonicalEquals(cp.getProxyID())) {
return cp.getProxyID();
}
}
}
return this;
}
/**
* deserializes the membership id, if necessary, and returns it. All access to membershipId should
* be through this method
*/
public DistributedMember getDistributedMember() {
if (memberId == null) {
ByteArrayDataInput dataInput = new ByteArrayDataInput(identity);
try {
memberId = (DistributedMember) DataSerializer.readObject(dataInput);
} catch (Exception e) {
logger.error("Unable to deserialize membership id", e);
}
}
return memberId;
}
/** Returns the byte-array for membership identity */
byte[] getMembershipByteArray() {
return this.identity;
}
/**
* Returns whether this <code>ClientProxyMembershipID</code> is durable.
*
* @return whether this <code>ClientProxyMembershipID</code> is durable
*
* @since GemFire 5.5
*/
public boolean isDurable() {
String durableClientId = getDistributedMember().getDurableClientAttributes().getId();
return durableClientId != null && !(durableClientId.length() == 0);
}
/**
* Returns this <code>ClientProxyMembershipID</code>'s durable attributes.
*
* @return this <code>ClientProxyMembershipID</code>'s durable attributes
*
* @since GemFire 5.5
*/
protected DurableClientAttributes getDurableAttributes() {
return getDistributedMember().getDurableClientAttributes();
}
/**
* Returns this <code>ClientProxyMembershipID</code>'s durable id.
*
* @return this <code>ClientProxyMembershipID</code>'s durable id
*
* @since GemFire 5.5
*/
public String getDurableId() {
DurableClientAttributes dca = getDurableAttributes();
return dca == null ? "" : dca.getId();
}
/**
* Returns this <code>ClientProxyMembershipID</code>'s durable timeout.
*
* @return this <code>ClientProxyMembershipID</code>'s durable timeout
*
* @since GemFire 5.5
*/
protected int getDurableTimeout() {
DurableClientAttributes dca = getDurableAttributes();
return dca == null ? 0 : dca.getTimeout();
}
/**
* Used to update the timeout when a durable client comes back to a server
*/
public void updateDurableTimeout(int newValue) {
InternalDistributedMember member = (InternalDistributedMember) getDistributedMember();
member.setDurableTimeout(newValue);
}
/**
* call this when the distributed system ID has been modified
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
justification = "Only applicable in client DS and in that case too multiple instances do not modify it at the same time.")
public void updateID(DistributedMember idm) {
// this.transientPort = ((InternalDistributedMember)this.memberId).getPort();
// if (this.transientPort == 0) {
// InternalDistributedSystem.getLogger().warning(
// String.format("%s",
// "updating client ID when member port is zero: " + this.memberId,
// new Exception("stack trace")
// ));
// }
HeapDataOutputStream hdos = new HeapDataOutputStream(256, Version.CURRENT);
try {
DataSerializer.writeObject(idm, hdos);
} catch (IOException e) {
throw new InternalGemFireException("Unable to serialize member: " + this.memberId, e);
}
this.identity = hdos.toByteArray();
if (this.memberId != null && this.memberId == systemMemberId) {
systemMemberId = idm;
// client_side_identity = this.identity;
}
this.memberId = idm;
this._toString = null; // make sure we don't retain the old ID representation in toString
}
/**
* Return the name of the <code>HARegion</code> queueing this proxy's messages. This is name is
* generated based on whether or not this proxy id is durable. If this proxy id is durable, then
* the durable client id is used. If this proxy id is not durable, then
* the<code>DistributedMember</code> string is used.
*
* @return the name of the <code>HARegion</code> queueing this proxy's messages.
*
* @since GemFire 5.5
*/
protected String getHARegionName() {
return getBaseRegionName() + "_queue";
}
/**
* Return the name of the region used for communicating interest changes between servers.
*
* @return the name of the region used for communicating interest changes between servers
*
* @since GemFire 5.6
*/
protected String getInterestRegionName() {
return getBaseRegionName() + "_interest";
}
private String getBaseRegionName() {
String id = isDurable() ? getDurableId() : getDSMembership();
if (id.indexOf('/') >= 0) {
id = id.replace('/', ':');
}
StringBuffer buffer = new StringBuffer().append("_gfe_").append(isDurable() ? "" : "non_")
.append("durable_client_").append("with_id_" + id).append("_").append(this.uniqueId);
return buffer.toString();
}
/**
* Resets the unique id counter. This is done for durable clients that stops/starts its cache.
* When it restarts its cache, it needs to maintain the same unique id
*
* @since GemFire 5.5
*/
public static synchronized void resetUniqueIdCounter() {
synch_counter = 0;
}
public Identity getIdentity() {
return new Identity();
}
/**
* Used to represent a unique identity of this ClientProxyMembershipID. It does this by ignoring
* the durable id and only respecting the unique_id and identity.
* <p>
* This class is used to clean up resources associated with a particular client and thus does not
* want to limit itself to the durable id.
*
* @since GemFire 5.7
*/
public class Identity {
public int getUniqueId() {
return uniqueId;
}
public byte[] getMemberIdBytes() {
return identity;
}
@Override
public int hashCode() {
int result = 17;
final int mult = 37;
byte[] idBytes = getMemberIdBytes();
if (idBytes != null && idBytes.length > 0) {
for (int i = 0; i < idBytes.length; i++) {
result = mult * result + idBytes[i];
}
}
result = mult * result + uniqueId;
return result;
}
@Override
public boolean equals(Object obj) {
if ((obj == null) || !(obj instanceof ClientProxyMembershipID.Identity)) {
return false;
}
ClientProxyMembershipID.Identity that = (ClientProxyMembershipID.Identity) obj;
return (getUniqueId() == that.getUniqueId()
&& Arrays.equals(getMemberIdBytes(), that.getMemberIdBytes()));
}
public ClientProxyMembershipID getClientProxyID() {
return ClientProxyMembershipID.this;
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
public static ClientProxyMembershipID readCanonicalized(DataInput in)
throws IOException, ClassNotFoundException {
ClientProxyMembershipID result = DataSerializer.readObject(in);
// We can't canonicalize if we have no identity.
// I only saw this happen in unit tests that serialize "new ClientProxyMembershipID()".
if (result == null || result.identity == null) {
return result;
}
return result.canonicalReference();
}
}