blob: fd97d69cc4da5e87812de3fcfc6d4ea4b9687010 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.locks;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A processor for telling the grantor that a lock service participant has shutdown. The grantor
* should release all locks which are currently held by the calling member.
*
* @since GemFire 4.0
*/
public class NonGrantorDestroyedProcessor extends ReplyProcessor21 {
private static final Logger logger = LogService.getLogger();
private NonGrantorDestroyedReplyMessage reply;
////////// Public static entry point /////////
/**
*
* Send a message to grantor telling it that we've shutdown the named lock service for this
* member.
* <p>
* Caller should loop, getting the grantor, calling <code>send</code>, and checking
* <code>informedGrantor()</code> until the grantor has acknowledged being informed.
*/
static boolean send(String serviceName, LockGrantorId theLockGrantorId, DistributionManager dm) {
InternalDistributedMember recipient = theLockGrantorId.getLockGrantorMember();
NonGrantorDestroyedProcessor processor = new NonGrantorDestroyedProcessor(dm, recipient);
NonGrantorDestroyedMessage.send(serviceName, recipient, dm, processor);
try {
processor.waitForRepliesUninterruptibly();
} catch (ReplyException e) {
e.handleCause();
}
return processor.informedGrantor();
}
//////////// Instance methods //////////////
/** Creates a new instance of NonGrantorDestroyedProcessor */
private NonGrantorDestroyedProcessor(DistributionManager dm, InternalDistributedMember grantor) {
super(dm, grantor);
}
@Override
public void process(DistributionMessage msg) {
try {
Assert.assertTrue(msg instanceof NonGrantorDestroyedReplyMessage,
"NonGrantorDestroyedProcessor is unable to process message of type " + msg.getClass());
this.reply = (NonGrantorDestroyedReplyMessage) msg;
} finally {
super.process(msg);
}
}
/** Returns true if the grantor acknowledged the msg with OK */
public boolean informedGrantor() {
return this.reply != null && this.reply.isOK();
}
@Override
protected boolean allowReplyFromSender() {
return true;
}
/////////////// Inner message classes //////////////////
public static class NonGrantorDestroyedMessage extends PooledDistributionMessage
implements MessageWithReply {
private int processorId;
/** The name of the DistributedLockService */
private String serviceName;
protected static void send(String serviceName, InternalDistributedMember grantor,
DistributionManager dm, ReplyProcessor21 proc) {
Assert.assertTrue(grantor != null, "Cannot send NonGrantorDestroyedMessage to null grantor");
NonGrantorDestroyedMessage msg = new NonGrantorDestroyedMessage();
msg.serviceName = serviceName;
msg.processorId = proc.getProcessorId();
msg.setRecipient(grantor);
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "NonGrantorDestroyedMessage sending {} to {}", msg,
grantor);
}
if (grantor.equals(dm.getId())) {
msg.setSender(dm.getId());
msg.processLocally(dm);
} else {
dm.putOutgoing(msg);
}
}
@Override
public int getProcessorId() {
return this.processorId;
}
private void reply(byte replyCode, DistributionManager dm) {
NonGrantorDestroyedReplyMessage.send(this, replyCode, dm);
}
@Override
protected void process(ClusterDistributionManager dm) {
basicProcess(dm);
}
/** Process locally without using messaging */
protected void processLocally(final DistributionManager dm) {
basicProcess(dm);
}
/** Perform basic processing of this message */
private void basicProcess(final DistributionManager dm) {
boolean replied = false;
try {
DLockService svc = DLockService.getInternalServiceNamed(this.serviceName);
if (svc != null && svc.isCurrentlyOrIsMakingLockGrantor()) {
DLockGrantor grantor = DLockGrantor.waitForGrantor(svc);
if (grantor != null) {
grantor.handleDepartureOf(getSender());
if (!grantor.isDestroyed()) {
reply(NonGrantorDestroyedReplyMessage.OK, dm);
replied = true;
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"Processing of NonGrantorDestroyedMessage resulted in InterruptedException", e);
}
} catch (LockServiceDestroyedException e) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"Processing of NonGrantorDestroyedMessage resulted in LockServiceDestroyedException",
e);
}
} catch (LockGrantorDestroyedException e) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"Processing of NonGrantorDestroyedMessage resulted in LockGrantorDestroyedException",
e);
}
} finally {
if (!replied) {
reply(NonGrantorDestroyedReplyMessage.NOT_GRANTOR, dm);
}
}
}
@Override
public int getDSFID() {
return NON_GRANTOR_DESTROYED_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.serviceName = DataSerializer.readString(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
DataSerializer.writeString(this.serviceName, out);
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("NonGrantorDestroyedMessage (serviceName='").append(this.serviceName)
.append("' processorId=").append(this.processorId).append(")");
return buff.toString();
}
}
public static class NonGrantorDestroyedReplyMessage extends ReplyMessage {
public static final byte OK = 0;
public static final byte NOT_GRANTOR = 1;
private byte replyCode;
public static void send(MessageWithReply destroyedMsg, byte replyCode, DistributionManager dm) {
NonGrantorDestroyedReplyMessage m = new NonGrantorDestroyedReplyMessage();
m.processorId = destroyedMsg.getProcessorId();
m.setRecipient(destroyedMsg.getSender());
m.replyCode = replyCode;
if (dm.getId().equals(destroyedMsg.getSender())) {
m.setSender(destroyedMsg.getSender());
m.dmProcess(dm);
} else {
dm.putOutgoing(m);
}
}
public boolean isOK() {
return this.replyCode == OK;
}
public static String replyCodeToString(int replyCode) {
String s = null;
switch (replyCode) {
case OK:
s = "OK";
break;
case NOT_GRANTOR:
s = "NOT_GRANTOR";
break;
default:
s = "UNKNOWN:" + String.valueOf(replyCode);
break;
}
return s;
}
@Override
public int getDSFID() {
return NON_GRANTOR_DESTROYED_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.replyCode = in.readByte();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeByte(this.replyCode);
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("NonGrantorDestroyedReplyMessage").append("; sender=").append(getSender())
.append("; processorId=").append(super.processorId).append("; replyCode=")
.append(replyCodeToString(this.replyCode)).append(")");
return buff.toString();
}
}
}