| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.admin.remote; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.InternalLocator; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.tcp.ConnectionTable; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * An instruction to all members with cache that their PR should gracefully close and disconnect DS |
| */ |
| public class ShutdownAllRequest extends AdminRequest { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final long SLEEP_TIME_BEFORE_DISCONNECT_DS = |
| Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "sleep-before-disconnect-ds", 1000); |
| |
| public ShutdownAllRequest() { |
| // do nothing |
| } |
| |
| /** |
| * Sends a shutdownAll request to all other members and performs local shutdownAll processing in |
| * the waitingThreadPool. |
| */ |
| public static Set send(final DistributionManager dm, long timeout) { |
| boolean hadCache = hasCache(dm); |
| ClusterDistributionManager dism = |
| dm instanceof ClusterDistributionManager ? (ClusterDistributionManager) dm : null; |
| InternalDistributedMember myId = dm.getDistributionManagerId(); |
| |
| Set recipients = dm.getOtherNormalDistributionManagerIds(); |
| |
| recipients.remove(myId); |
| |
| // now do shutdownall |
| ShutdownAllRequest request = new ShutdownAllRequest(); |
| request.setRecipients(recipients); |
| |
| ShutDownAllReplyProcessor replyProcessor = new ShutDownAllReplyProcessor(dm, recipients); |
| request.msgId = replyProcessor.getProcessorId(); |
| dm.putOutgoing(request); |
| |
| if (!InternalLocator.isDedicatedLocator()) { |
| if (hadCache && dism != null) { |
| AdminResponse response; |
| try { |
| request.setSender(myId); |
| response = request.createResponse(dism); |
| } catch (Exception ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("caught exception while processing shutdownAll locally", ex); |
| } |
| response = AdminFailureResponse.create(myId, ex); |
| } |
| response.setSender(myId); |
| replyProcessor.process(response); |
| } |
| } |
| |
| boolean interrupted = false; |
| try { |
| if (!replyProcessor.waitForReplies(timeout)) { |
| return null; |
| } |
| } catch (ReplyException e) { |
| if (!(e.getCause() instanceof CancelException)) { |
| e.handleCause(); |
| } |
| } catch (CancelException ignore) { |
| // expected |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } |
| |
| // wait until all the recipients send response, shut down itself (if not a locator) |
| if (hadCache) { |
| // at this point,GemFireCacheImpl.getInstance() might return null, |
| // because the cache is closed at GemFireCacheImpl.getInstance().shutDownAll() |
| if (!InternalLocator.isDedicatedLocator()) { |
| InternalDistributedSystem ids = dm.getSystem(); |
| if (ids.isConnected()) { |
| ids.disconnect(); |
| } |
| } |
| } |
| |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| try { |
| Thread.sleep(3 * SLEEP_TIME_BEFORE_DISCONNECT_DS); |
| } catch (InterruptedException ignore) { |
| } |
| return replyProcessor.getResults(); |
| } |
| |
| @Override |
| public boolean sendViaUDP() { |
| return true; |
| } |
| |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| boolean isToShutdown = hasCache(dm); |
| super.process(dm); |
| |
| if (isToShutdown) { |
| // Do the disconnect in an async thread. The thread we are running |
| // in is one in the dm threadPool so we do not want to call disconnect |
| // from this thread because it prevents dm from cleaning up all its threads |
| // and causes a 20 second delay. |
| final InternalDistributedSystem ids = dm.getSystem(); |
| if (ids.isConnected()) { |
| Thread t = new LoggingThread("ShutdownAllRequestDisconnectThread", false, () -> { |
| try { |
| Thread.sleep(SLEEP_TIME_BEFORE_DISCONNECT_DS); |
| } catch (InterruptedException ignore) { |
| } |
| ConnectionTable.threadWantsSharedResources(); |
| if (ids.isConnected()) { |
| ids.disconnect(); |
| } |
| }); |
| t.start(); |
| } |
| } |
| } |
| |
| private static boolean hasCache(DistributionManager manager) { |
| InternalCache cache = manager.getCache(); |
| return cache != null && !cache.isClosed(); |
| } |
| |
| @Override |
| protected AdminResponse createResponse(DistributionManager dm) { |
| boolean isToShutdown = hasCache(dm); |
| if (isToShutdown) { |
| boolean isSuccess = false; |
| try { |
| dm.getCache().shutDownAll(); |
| isSuccess = true; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| |
| if (t instanceof InternalGemFireError) { |
| logger.fatal("DistributedSystem is closed due to InternalGemFireError", t); |
| } else { |
| logger.fatal("DistributedSystem is closed due to unexpected exception", t); |
| } |
| } finally { |
| if (!isSuccess) { |
| InternalDistributedMember me = dm.getDistributionManagerId(); |
| InternalDistributedSystem ids = dm.getSystem(); |
| if (!this.getSender().equals(me)) { |
| if (ids.isConnected()) { |
| logger.fatal("ShutdownAllRequest: disconnect distributed without response."); |
| ids.disconnect(); |
| } |
| } |
| } |
| } |
| } |
| |
| return new ShutdownAllResponse(this.getSender(), isToShutdown); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return SHUTDOWN_ALL_REQUEST; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| } |
| |
| @Override |
| public String toString() { |
| return "ShutdownAllRequest sent to " + Arrays.toString(this.getRecipients()) + " from " |
| + this.getSender(); |
| } |
| |
| private static class ShutDownAllReplyProcessor extends AdminMultipleReplyProcessor { |
| Set<DistributedMember> results = Collections.synchronizedSet(new TreeSet<>()); |
| |
| ShutDownAllReplyProcessor(DistributionManager dm, Collection initMembers) { |
| super(dm, initMembers); |
| } |
| |
| @Override |
| protected boolean stopBecauseOfExceptions() { |
| return false; |
| } |
| |
| /** |
| * If response arrives, we will save into results and keep wait for member's departure. If the |
| * member is departed before sent response, no wait for its response |
| */ |
| @Override |
| public void process(DistributionMessage msg) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("shutdownAll reply processor is processing {}", msg); |
| } |
| if (msg instanceof ShutdownAllResponse) { |
| if (((ShutdownAllResponse) msg).isToShutDown()) { |
| synchronized (results) { |
| logger.debug("{} adding {} to result set {}", this, msg.getSender(), |
| results); |
| this.results.add(msg.getSender()); |
| } |
| } else { |
| // for member without cache, we will not wait for its result |
| // so no need to wait its DS to close either |
| removeMember(msg.getSender(), false); |
| } |
| |
| if (msg.getSender().equals(this.dmgr.getDistributionManagerId())) { |
| // mark myself as done since my response has been sent and my DS |
| // will be closed later anyway |
| removeMember(msg.getSender(), false); |
| } |
| } |
| |
| if (msg instanceof ReplyMessage) { |
| ReplyException ex = ((ReplyMessage) msg).getException(); |
| if (ex != null) { |
| processException(msg, ex); |
| } |
| } |
| |
| checkIfDone(); |
| } |
| |
| public Set getResults() { |
| synchronized (results) { |
| logger.debug("{} shutdownAll returning {}", this, |
| results); |
| return new HashSet(results); |
| } |
| } |
| } |
| } |