| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.admin.remote; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.FutureTask; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.IncompatibleSystemException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.admin.OperationCancelledException; |
| import org.apache.geode.admin.RuntimeAdminException; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem.ReconnectListener; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.admin.Alert; |
| import org.apache.geode.internal.admin.AlertListener; |
| import org.apache.geode.internal.admin.ApplicationVM; |
| import org.apache.geode.internal.admin.CacheCollector; |
| import org.apache.geode.internal.admin.CacheSnapshot; |
| import org.apache.geode.internal.admin.GemFireVM; |
| import org.apache.geode.internal.admin.GfManagerAgent; |
| import org.apache.geode.internal.admin.GfManagerAgentConfig; |
| import org.apache.geode.internal.admin.JoinLeaveListener; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.LogWriterFactory; |
| import org.apache.geode.internal.logging.LoggingThread; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.security.AuthenticationFailedException; |
| |
| /** |
| * An implementation of <code>GfManagerAgent</code> that uses a {@link ClusterDistributionManager} |
| * to communicate with other members of the distributed system. Because it is a |
| * <code>MembershipListener</code> it is alerted when members join and leave the distributed system. |
| * It also implements support for {@link JoinLeaveListener}s as well suport for collecting and |
| * collating the pieces of a {@linkplain CacheCollector cache snapshot}. |
| */ |
| public |
| // Note that since we export the instances in a public list, |
| // I'm not permitting subclasses |
| class RemoteGfManagerAgent implements GfManagerAgent { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| // instance variables |
| |
| /** |
| * The connection to the distributed system through which this admin agent communicates |
| * |
| * @since GemFire 4.0 |
| */ |
| protected volatile InternalDistributedSystem system; |
| private final Object systemLock = new Object(); |
| |
| /** Is this agent connected to the distributed system */ |
| protected volatile boolean connected = false; |
| |
| /** Is this agent listening for messages from the distributed system? */ |
| private volatile boolean listening = true; |
| |
| /** |
| * A daemon thread that continuously attempts to connect to the distributed system. |
| */ |
| private DSConnectionDaemon daemon; |
| |
| /** An alert listener that receives alerts */ |
| private final AlertListener alertListener; |
| |
| /** The level at which alerts are logged */ |
| protected /* final */ int alertLevel; |
| |
| /** The transport configuration used to connect to the distributed system */ |
| private final RemoteTransportConfig transport; |
| |
| /** |
| * Optional display name used for |
| * {@link org.apache.geode.distributed.DistributedSystem#getName()}. |
| */ |
| private final String displayName; |
| |
| /** The <code>JoinLeaveListener</code>s registered with this agent */ |
| protected volatile Set listeners = Collections.EMPTY_SET; |
| private final Object listenersLock = new Object(); |
| |
| private CacheCollector collector; |
| |
| /** |
| * The known application VMs. Maps member id to RemoteApplicationVM instances |
| */ |
| protected volatile Map membersMap = Collections.EMPTY_MAP; |
| private final Object membersLock = new Object(); |
| |
| // LOG: used to log WARN for AuthenticationFailedException |
| private final InternalLogWriter securityLogWriter; |
| |
| /** |
| * A queue of <code>SnapshotResultMessage</code>s the are processed by a SnapshotResultDispatcher |
| */ |
| protected BlockingQueue snapshotResults = new LinkedBlockingQueue(); |
| |
| /** A thread that asynchronously handles incoming cache snapshots. */ |
| private SnapshotResultDispatcher snapshotDispatcher; |
| |
| /** Thread that processes membership joins */ |
| protected JoinProcessor joinProcessor; |
| |
| /** Ordered List for processing of pendingJoins; elements are InternalDistributedMember */ |
| protected volatile List pendingJoins = Collections.EMPTY_LIST; |
| |
| /** Lock for altering the contents of the pendingJoins Map and List */ |
| private final Object pendingJoinsLock = new Object(); |
| |
| /** Id of the currentJoin that is being processed */ |
| protected volatile InternalDistributedMember currentJoin; |
| |
| /** |
| * True if the currentJoin needs to be aborted because the member has left |
| */ |
| protected volatile boolean abortCurrentJoin = false; |
| |
| /** |
| * Has this <code>RemoteGfManagerAgent</code> been initialized? That is, after it has been |
| * connected has this agent created admin objects for the initial members of the distributed |
| * system? |
| */ |
| protected volatile boolean initialized = false; |
| |
| /** |
| * Has this agent manager been disconnected? |
| */ |
| private boolean disconnected = false; |
| |
| /** DM MembershipListener for this RemoteGfManagerAgent */ |
| private MyMembershipListener myMembershipListener; |
| private final Object myMembershipListenerLock = new Object(); |
| |
| private DisconnectListener disconnectListener; |
| |
| private static final Object enumerationSync = new Object(); |
| |
| /** |
| * Safe to read, updates controlled by {@link #enumerationSync} |
| */ |
| @MakeNotStatic |
| private static volatile ArrayList allAgents = new ArrayList(); |
| |
| private static void addAgent(RemoteGfManagerAgent toAdd) { |
| synchronized (enumerationSync) { |
| ArrayList replace = new ArrayList(allAgents); |
| replace.add(toAdd); |
| allAgents = replace; |
| } |
| } |
| |
| private static void removeAgent(RemoteGfManagerAgent toRemove) { |
| synchronized (enumerationSync) { |
| ArrayList replace = new ArrayList(allAgents); |
| replace.remove(toRemove); |
| allAgents = replace; |
| } |
| } |
| |
| /** |
| * break any potential circularity in {@link #loadEmergencyClasses()} |
| */ |
| @MakeNotStatic |
| private static volatile boolean emergencyClassesLoaded = false; |
| |
| /** |
| * Ensure that the InternalDistributedSystem class gets loaded. |
| * |
| * @see SystemFailure#loadEmergencyClasses() |
| */ |
| public static void loadEmergencyClasses() { |
| if (emergencyClassesLoaded) |
| return; |
| emergencyClassesLoaded = true; |
| InternalDistributedSystem.loadEmergencyClasses(); |
| } |
| |
| /** |
| * Close all of the RemoteGfManagerAgent's. |
| * |
| * @see SystemFailure#emergencyClose() |
| */ |
| public static void emergencyClose() { |
| ArrayList members = allAgents; // volatile fetch |
| for (int i = 0; i < members.size(); i++) { |
| RemoteGfManagerAgent each = (RemoteGfManagerAgent) members.get(i); |
| each.system.emergencyClose(); |
| } |
| } |
| |
| /** |
| * Return a recent (though possibly incomplete) list of all existing agents |
| * |
| * @return list of agents |
| */ |
| public static ArrayList getAgents() { |
| return allAgents; |
| } |
| |
| // constructors |
| |
| /** |
| * Creates a new <code>RemoteGfManagerAgent</code> accord to the given config. Along the way it |
| * (attempts to) connects to the distributed system. |
| */ |
| public RemoteGfManagerAgent(GfManagerAgentConfig cfg) { |
| if (!(cfg.getTransport() instanceof RemoteTransportConfig)) { |
| throw new IllegalArgumentException( |
| String.format("Expected %s to be a RemoteTransportConfig", |
| cfg.getTransport())); |
| } |
| this.transport = (RemoteTransportConfig) cfg.getTransport(); |
| this.displayName = cfg.getDisplayName(); |
| this.alertListener = cfg.getAlertListener(); |
| if (this.alertListener != null) { |
| if (this.alertListener instanceof JoinLeaveListener) { |
| addJoinLeaveListener((JoinLeaveListener) this.alertListener); |
| } |
| } |
| int tmp = cfg.getAlertLevel(); |
| if (this.alertListener == null) { |
| tmp = Alert.OFF; |
| } |
| alertLevel = tmp; |
| |
| // LOG: get LogWriter from the AdminDistributedSystemImpl -- used for |
| // AuthenticationFailedException |
| InternalLogWriter logWriter = cfg.getLogWriter(); |
| if (logWriter == null) { |
| throw new NullPointerException("LogWriter must not be null"); |
| } |
| if (logWriter.isSecure()) { |
| this.securityLogWriter = logWriter; |
| } else { |
| this.securityLogWriter = LogWriterFactory.toSecurityLogWriter(logWriter); |
| } |
| |
| this.disconnectListener = cfg.getDisconnectListener(); |
| |
| this.joinProcessor = new JoinProcessor(); |
| this.joinProcessor.start(); |
| |
| join(); |
| |
| snapshotDispatcher = new SnapshotResultDispatcher(); |
| snapshotDispatcher.start(); |
| |
| // Note that this makes this instance externally visible. |
| // This is why this class is final. |
| addAgent(this); |
| } |
| |
| private void join() { |
| daemon = new DSConnectionDaemon(); |
| daemon.start(); |
| // give the daemon some time to get us connected |
| // we don't want to wait forever since there may be no one to connect to |
| try { |
| long endTime = System.currentTimeMillis() + 2000; // wait 2 seconds |
| while (!connected && daemon.isAlive() && System.currentTimeMillis() < endTime) { |
| daemon.join(200); |
| } |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| // Peremptory cancellation check, but keep going |
| this.system.getCancelCriterion().checkCancelInProgress(ignore); |
| } |
| } |
| |
| // static methods |
| |
| |
| /** |
| * Handles an <code>ExecutionException</code> by examining its cause and throwing an appropriate |
| * runtime exception. |
| */ |
| private static void handle(ExecutionException ex) { |
| Throwable cause = ex.getCause(); |
| |
| if (cause instanceof OperationCancelledException) { |
| // Operation was cancelled, we don't necessary want to propagate |
| // this up to the user. |
| return; |
| } |
| if (cause instanceof DistributedSystemDisconnectedException) { |
| throw new DistributedSystemDisconnectedException("While waiting for Future", ex); |
| } |
| |
| // Don't just throw the cause because the stack trace can be |
| // misleading. For instance, the cause might have occurred in a |
| // different thread. In addition to the cause, we also want to |
| // know which code was waiting for the Future. |
| throw new RuntimeAdminException( |
| "An ExceputionException was thrown while waiting for Future.", |
| ex); |
| } |
| |
| // Object methodsg |
| |
| @Override |
| public String toString() { |
| return "Distributed System " + this.transport; |
| } |
| |
| |
| // GfManagerAgent methods |
| |
| /** |
| * Disconnects this agent from the distributed system. If this is a dedicated administration VM, |
| * then the underlying connection to the distributed system is also closed. |
| * |
| * @return true if this agent was disconnected as a result of the invocation |
| * @see RemoteGemFireVM#disconnect |
| */ |
| @Override |
| public boolean disconnect() { |
| boolean disconnectedTrue = false; |
| synchronized (this) { |
| if (disconnected) { |
| return false; |
| } |
| disconnected = true; |
| disconnectedTrue = true; |
| } |
| try { |
| listening = false; |
| // joinProcessor.interrupt(); |
| joinProcessor.shutDown(); |
| boolean removeListener = this.alertLevel != Alert.OFF; |
| if (this.isConnected() || (this.membersMap.size() > 0)) { // Hot fix from 6.6.3 |
| RemoteApplicationVM[] apps = (RemoteApplicationVM[]) listApplications(disconnectedTrue); |
| for (int i = 0; i < apps.length; i++) { |
| try { |
| apps[i].disconnect(removeListener); // hit NPE here in ConsoleDistributionManagerTest so |
| // fixed listApplications to exclude nulls returned |
| // from future |
| } catch (RuntimeException ignore) { |
| // if we can't notify a member that we are disconnecting don't throw |
| // an exception. We need to finish disconnecting other resources. |
| } |
| } |
| try { |
| DistributionManager dm = system.getDistributionManager(); |
| synchronized (this.myMembershipListenerLock) { |
| if (this.myMembershipListener != null) { |
| dm.removeMembershipListener(this.myMembershipListener); |
| } |
| } |
| |
| if (dm instanceof ClusterDistributionManager) { |
| ((ClusterDistributionManager) dm).setAgent(null); |
| } |
| |
| } catch (IllegalArgumentException ignore) { |
| // this can happen when connectToDS has not yet done the add |
| |
| } catch (DistributedSystemDisconnectedException de) { |
| // ignore a forced disconnect and finish clean-up |
| } |
| |
| synchronized (systemLock) { |
| if (system != null && ClusterDistributionManager.isDedicatedAdminVM() |
| && system.isConnected()) { |
| system.disconnect(); |
| } |
| this.system = null; |
| } |
| this.connected = false; |
| } |
| |
| daemon.shutDown(); |
| |
| if (snapshotDispatcher != null) { |
| snapshotDispatcher.shutDown(); |
| } |
| } finally { |
| removeAgent(this); |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean isListening() { |
| return listening; |
| } |
| |
| /** |
| * Returns whether or not this manager agent has created admin objects for the initial members of |
| * the distributed system. |
| * |
| * @since GemFire 4.0 |
| */ |
| @Override |
| public boolean isInitialized() { |
| return this.initialized; |
| } |
| |
| @Override |
| public boolean isConnected() { |
| return this.connected && system != null && system.isConnected(); |
| } |
| |
| @Override |
| public ApplicationVM[] listApplications() { |
| return listApplications(false); |
| } |
| |
| public ApplicationVM[] listApplications(boolean disconnected) {// Hot fix from 6.6.3 |
| if (isConnected() || (this.membersMap.size() > 0 && disconnected)) { |
| // removed synchronization on members... |
| Collection remoteApplicationVMs = new ArrayList(this.membersMap.size()); |
| VMS: for (Iterator iter = this.membersMap.values().iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| for (;;) { |
| try { |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| } catch (DistributedSystemDisconnectedException de) { |
| // ignore during forced disconnect |
| } |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Object obj = future.get(); |
| if (obj != null) { |
| remoteApplicationVMs.add(obj); |
| } |
| break; |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } catch (CancellationException ex) { |
| continue VMS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue VMS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } // VMS |
| |
| RemoteApplicationVM[] array = new RemoteApplicationVM[remoteApplicationVMs.size()]; |
| remoteApplicationVMs.toArray(array); |
| return array; |
| |
| } else { |
| return new RemoteApplicationVM[0]; |
| } |
| } |
| |
| @Override |
| public GfManagerAgent[] listPeers() { |
| return new GfManagerAgent[0]; |
| } |
| |
| /** |
| * Registers a <code>JoinLeaveListener</code>. on this agent that is notified when membership in |
| * the distributed system changes. |
| */ |
| @Override |
| public void addJoinLeaveListener(JoinLeaveListener observer) { |
| synchronized (this.listenersLock) { |
| final Set oldListeners = this.listeners; |
| if (!oldListeners.contains(observer)) { |
| final Set newListeners = new HashSet(oldListeners); |
| newListeners.add(observer); |
| this.listeners = newListeners; |
| } |
| } |
| } |
| |
| /** |
| * Deregisters a <code>JoinLeaveListener</code> from this agent. |
| */ |
| @Override |
| public void removeJoinLeaveListener(JoinLeaveListener observer) { |
| synchronized (this.listenersLock) { |
| final Set oldListeners = this.listeners; |
| if (oldListeners.contains(observer)) { |
| final Set newListeners = new HashSet(oldListeners); |
| if (newListeners.remove(observer)) { |
| this.listeners = newListeners; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sets the <code>CacheCollector</code> that <code>CacheSnapshot</code>s are delivered to. |
| */ |
| @Override |
| public synchronized void setCacheCollector(CacheCollector collector) { |
| this.collector = collector; |
| } |
| |
| // misc instance methods |
| |
| public RemoteTransportConfig getTransport() { |
| return this.transport; |
| } |
| |
| /** Is this thread currently sending a message? */ |
| private static final ThreadLocal sending = new ThreadLocal() { |
| @Override |
| protected Object initialValue() { |
| return Boolean.FALSE; |
| } |
| }; |
| |
| /** |
| * Sends an AdminRequest and waits for the AdminReponse |
| */ |
| AdminResponse sendAndWait(AdminRequest msg) { |
| try { |
| if (((Boolean) sending.get()).booleanValue()) { |
| throw new OperationCancelledException( |
| String.format("Recursion detected while sending %s", |
| msg)); |
| |
| } else { |
| sending.set(Boolean.TRUE); |
| } |
| |
| ClusterDistributionManager dm = |
| (ClusterDistributionManager) this.system.getDistributionManager(); |
| if (isConnected()) { |
| return msg.sendAndWait(dm); |
| } else { |
| // bug 39824: generate CancelException if we're shutting down |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| throw new RuntimeAdminException( |
| String.format("%s is not currently connected.", |
| this)); |
| } |
| |
| } finally { |
| sending.set(Boolean.FALSE); |
| } |
| } |
| |
| /** |
| * Sends a message and does not wait for a response |
| */ |
| void sendAsync(DistributionMessage msg) { |
| if (system != null) { |
| system.getDistributionManager().putOutgoing(msg); |
| } |
| } |
| |
| /** |
| * Returns the distributed system member (application VM or system VM) with the given |
| * <code>id</code>. |
| */ |
| public RemoteGemFireVM getMemberById(InternalDistributedMember id) { |
| return getApplicationById(id); |
| } |
| |
| /** |
| * Returns the application VM with the given <code>id</code>. |
| */ |
| public RemoteApplicationVM getApplicationById(InternalDistributedMember id) { |
| if (isConnected()) { |
| // removed synchronized(members) |
| Future future = (Future) this.membersMap.get(id); |
| if (future == null) { |
| return null; |
| } |
| for (;;) { |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| return (RemoteApplicationVM) future.get(); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } catch (CancellationException ex) { |
| return null; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns a representation of the application VM with the given distributed system id. If there |
| * does not already exist a representation for that member, a new one is created. |
| */ |
| private RemoteApplicationVM addMember(final InternalDistributedMember id) { |
| boolean runFuture = false; |
| FutureTask future; |
| synchronized (this.membersLock) { |
| final Map oldMembersMap = this.membersMap; |
| future = (FutureTask) oldMembersMap.get(id); |
| if (future == null) { |
| runFuture = true; |
| if (this.abortCurrentJoin) |
| return null; |
| future = new FutureTask(new Callable() { |
| @Override |
| public Object call() throws Exception { |
| // Do this work in a Future to avoid deadlock seen in |
| // bug 31562. |
| RemoteGfManagerAgent agent = RemoteGfManagerAgent.this; |
| RemoteApplicationVM result = new RemoteApplicationVM(agent, id, alertLevel); |
| result.startStatDispatcher(); |
| if (agent.abortCurrentJoin) { |
| result.stopStatListening(); |
| return null; |
| } |
| return result; |
| } |
| }); |
| Map newMembersMap = new HashMap(oldMembersMap); |
| newMembersMap.put(id, future); |
| if (this.abortCurrentJoin) |
| return null; |
| this.membersMap = newMembersMap; |
| } |
| } |
| |
| if (runFuture) { |
| // Run future outside of membersLock |
| future.run(); |
| } |
| |
| for (;;) { |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| return (RemoteApplicationVM) future.get(); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } catch (CancellationException ex) { |
| return null; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| |
| /** |
| * Removes and returns the representation of the application VM member of the distributed system |
| * with the given <code>id</code>. |
| */ |
| protected RemoteApplicationVM removeMember(InternalDistributedMember id) { |
| Future future = null; |
| synchronized (this.membersLock) { |
| Map oldMembersMap = this.membersMap; |
| if (oldMembersMap.containsKey(id)) { |
| Map newMembersMap = new HashMap(oldMembersMap); |
| future = (Future) newMembersMap.remove(id); |
| if (future != null) { |
| this.membersMap = newMembersMap; |
| } |
| } |
| } |
| |
| if (future != null) { |
| future.cancel(true); |
| for (;;) { |
| synchronized (systemLock) { |
| if (system == null) { |
| return null; |
| } |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| return (RemoteApplicationVM) future.get(); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } catch (CancellationException ex) { |
| return null; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Places a <code>SnapshotResultMessage</code> on a queue to be processed asynchronously. |
| */ |
| void enqueueSnapshotResults(SnapshotResultMessage msg) { |
| if (!isListening()) { |
| return; |
| } |
| for (;;) { |
| this.system.getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| snapshotResults.put(msg); |
| break; |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sends the given <code>alert</code> to this agent's {@link AlertListener}. |
| */ |
| void callAlertListener(Alert alert) { |
| if (!isListening()) { |
| return; |
| } |
| if (alertListener != null && alert.getLevel() >= this.alertLevel) { |
| alertListener.alert(alert); |
| } |
| } |
| |
| /** |
| * Invokes the {@link CacheCollector#resultsReturned} method of this agent's cache collector. |
| */ |
| protected void callCacheCollector(CacheSnapshot results, InternalDistributedMember sender, |
| int snapshotId) { |
| if (!isListening()) { |
| return; |
| } |
| if (collector != null) { |
| GemFireVM vm = getMemberById(sender); |
| if (vm != null) { |
| collector.resultsReturned(results, vm, snapshotId); |
| } |
| } |
| } |
| |
| protected DisconnectListener getDisconnectListener() { |
| synchronized (this) { |
| return this.disconnectListener; |
| } |
| } |
| |
| /** |
| * Creates a new {@link InternalDistributedSystem} connection for this agent. If one alread |
| * exists, it is <code>disconnected</code> and a new one is created. |
| */ |
| protected void connectToDS() { |
| if (!isListening()) { |
| return; |
| } |
| Properties props = this.transport.toDSProperties(); |
| if (this.displayName != null && this.displayName.length() > 0) { |
| props.setProperty("name", this.displayName); |
| } |
| |
| synchronized (systemLock) { |
| if (system != null) { |
| system.disconnect(); |
| system = null; |
| } |
| this.system = (InternalDistributedSystem) InternalDistributedSystem.connectForAdmin(props); |
| } |
| |
| DistributionManager dm = system.getDistributionManager(); |
| if (dm instanceof ClusterDistributionManager) { |
| ((ClusterDistributionManager) dm).setAgent(this); |
| } |
| |
| synchronized (this) { |
| this.disconnected = false; |
| } |
| |
| this.system.addDisconnectListener(new InternalDistributedSystem.DisconnectListener() { |
| @Override |
| public String toString() { |
| return String.format("Disconnect listener for %s", |
| RemoteGfManagerAgent.this); |
| } |
| |
| @Override |
| public void onDisconnect(InternalDistributedSystem sys) { |
| // Before the disconnect handler is called, the InternalDistributedSystem has already marked |
| // itself for |
| // the disconnection. Hence the check for RemoteGfManagerAgent.this.isConnected() always |
| // returns false. |
| // Hence commenting the same. |
| // if(RemoteGfManagerAgent.this.isConnected()) { |
| boolean reconnect = sys.isReconnecting(); |
| if (!reconnect) { |
| final DisconnectListener listener = RemoteGfManagerAgent.this.getDisconnectListener(); |
| if (RemoteGfManagerAgent.this.disconnect()) { // returns true if disconnected during this |
| // call |
| if (listener != null) { |
| listener.onDisconnect(sys); |
| } |
| } |
| } |
| } |
| }); |
| InternalDistributedSystem.addReconnectListener(new ReconnectListener() { |
| @Override |
| public void reconnecting(InternalDistributedSystem oldsys) {} |
| |
| @Override |
| public void onReconnect(InternalDistributedSystem oldsys, InternalDistributedSystem newsys) { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug("RemoteGfManagerAgent.onReconnect attempting to join new distributed system"); |
| } |
| join(); |
| } |
| }); |
| |
| synchronized (this.myMembershipListenerLock) { |
| this.myMembershipListener = new MyMembershipListener(); |
| dm.addMembershipListener(this.myMembershipListener); |
| Set initialMembers = dm.getDistributionManagerIds(); |
| this.myMembershipListener.addMembers(initialMembers); |
| |
| if (logger.isDebugEnabled()) { |
| StringBuffer sb = new StringBuffer("[RemoteGfManagerAgent] "); |
| sb.append("Connected to DS with "); |
| sb.append(initialMembers.size()); |
| sb.append(" members: "); |
| |
| for (Iterator it = initialMembers.iterator(); it.hasNext();) { |
| InternalDistributedMember member = (InternalDistributedMember) it.next(); |
| sb.append(member); |
| sb.append(" "); |
| } |
| |
| this.logger.debug(sb.toString()); |
| } |
| |
| connected = true; |
| |
| for (Iterator it = initialMembers.iterator(); it.hasNext();) { |
| InternalDistributedMember member = (InternalDistributedMember) it.next(); |
| |
| // Create the admin objects synchronously. We don't need to use |
| // the JoinProcess when we first start up. |
| try { |
| handleJoined(member); |
| } catch (OperationCancelledException ex) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "join cancelled by departure"); |
| } |
| } |
| } |
| |
| this.initialized = true; |
| } // sync |
| } |
| |
| //////////// inner classes /////////////////////////// |
| |
| /** |
| * A Daemon thread that asynchronously connects to the distributed system. It is necessary to |
| * nicely handle the situation when the user accidentally connects to a distributed system with no |
| * members or attempts to connect to a distributed system whose member run a different version of |
| * GemFire. |
| */ |
| private class DSConnectionDaemon extends LoggingThread { |
| /** Has this thread been told to stop? */ |
| private volatile boolean shutDown = false; |
| |
| protected DSConnectionDaemon() { |
| super("DSConnectionDaemon"); |
| } |
| |
| public void shutDown() { |
| shutDown = true; |
| this.interrupt(); |
| } |
| |
| /** |
| * Keep trying to connect to the distributed system. If we have problems connecting, the agent |
| * will not be marked as "connected". |
| */ |
| @Override |
| public void run() { |
| TOP: while (!shutDown) { |
| SystemFailure.checkFailure(); |
| try { |
| connected = false; |
| initialized = false; |
| if (!shutDown) { |
| connectToDS(); |
| |
| // If we're successful, this thread is done |
| if (isListening()) { |
| Assert.assertTrue(system != null); |
| } |
| return; |
| } |
| } catch (IncompatibleSystemException ise) { |
| logger.fatal(ise.getMessage(), ise); |
| callAlertListener(new VersionMismatchAlert(RemoteGfManagerAgent.this, ise.getMessage())); |
| } catch (Exception e) { |
| for (Throwable cause = e; cause != null; cause = cause.getCause()) { |
| if (cause instanceof InterruptedException) { |
| // We were interrupted while connecting. Most likely we |
| // are being shutdown. |
| if (shutDown) { |
| break TOP; |
| } |
| } |
| |
| if (cause instanceof AuthenticationFailedException) { |
| // Incorrect credentials. Log & Shutdown |
| shutDown = true; |
| securityLogWriter.warning( |
| "[RemoteGfManagerAgent]: An AuthenticationFailedException was caught while connecting to DS", |
| e); |
| break TOP; |
| } |
| } |
| |
| logger.debug("[RemoteGfManagerAgent] While connecting to DS", e); |
| } |
| try { |
| sleep(1000); |
| } catch (InterruptedException ignore) { |
| // We're just exiting, no need to restore the interrupt bit. |
| } |
| } |
| connected = false; |
| initialized = false; |
| } |
| |
| } // end DSConnectionDaemon |
| |
| /** |
| * A daemon thread that reads {@link SnapshotResultMessage}s from a queue and invokes the |
| * <code>CacheCollector</code> accordingly. |
| */ |
| private class SnapshotResultDispatcher extends LoggingThread { |
| private volatile boolean shutDown = false; |
| |
| public SnapshotResultDispatcher() { |
| super("SnapshotResultDispatcher"); |
| } |
| |
| public void shutDown() { |
| shutDown = true; |
| this.interrupt(); |
| } |
| |
| @Override |
| public void run() { |
| while (!shutDown) { |
| SystemFailure.checkFailure(); |
| try { |
| SnapshotResultMessage msg = (SnapshotResultMessage) snapshotResults.take(); |
| callCacheCollector(msg.getSnapshot(), msg.getSender(), msg.getSnapshotId()); |
| yield(); // TODO: this is a hot thread |
| } catch (InterruptedException ignore) { |
| // We'll just exit, no need to reset interrupt bit. |
| if (shutDown) { |
| break; |
| } |
| logger.warn("Ignoring strange interrupt", ignore); |
| } catch (Exception ex) { |
| logger.fatal(ex.getMessage(), ex); |
| } |
| } |
| } |
| } // end SnapshotResultDispatcher |
| |
| @Override |
| public DistributionManager getDM() { |
| InternalDistributedSystem sys = this.system; |
| if (sys == null) { |
| return null; |
| } |
| return sys.getDistributionManager(); |
| } |
| |
| /** Returns the bind address this vm uses to connect to this system (Kirk Lund) */ |
| public String getBindAddress() { |
| return this.transport.getBindAddress(); |
| } |
| |
| /** Returns true if this vm is using a non-default bind address (Kirk Lund) */ |
| public boolean hasNonDefaultBindAddress() { |
| if (getBindAddress() == null) |
| return false; |
| return !DistributionConfig.DEFAULT_BIND_ADDRESS.equals(getBindAddress()); |
| } |
| |
| /** |
| * Sets the alert level for this manager agent. Sends a {@link AlertLevelChangeMessage} to each |
| * member of the distributed system. |
| */ |
| @Override |
| public void setAlertLevel(int level) { |
| this.alertLevel = level; |
| AlertLevelChangeMessage m = AlertLevelChangeMessage.create(level); |
| sendAsync(m); |
| } |
| |
| /** |
| * Returns the distributed system administered by this agent. |
| */ |
| @Override |
| public InternalDistributedSystem getDSConnection() { |
| return this.system; |
| } |
| |
| /** |
| * Handles a membership join asynchronously from the memberJoined notification. Sets and clears |
| * current join. Also makes several checks to support aborting of the current join. |
| */ |
| protected void handleJoined(InternalDistributedMember id) { |
| if (!isListening()) { |
| return; |
| } |
| |
| // set current join and begin handling of it... |
| this.currentJoin = id; |
| try { |
| GemFireVM member = null; |
| switch (id.getVmKind()) { |
| case ClusterDistributionManager.NORMAL_DM_TYPE: |
| member = addMember(id); |
| break; |
| case ClusterDistributionManager.LOCATOR_DM_TYPE: |
| break; |
| case ClusterDistributionManager.ADMIN_ONLY_DM_TYPE: |
| break; |
| case ClusterDistributionManager.LONER_DM_TYPE: |
| break; // should this ever happen? :-) |
| default: |
| throw new IllegalArgumentException(String.format("Unknown VM Kind: %s", |
| Integer.valueOf(id.getVmKind()))); |
| } |
| |
| // if we have a valid member process it... |
| if (this.abortCurrentJoin) { |
| return; |
| } |
| if (member != null) { |
| if (this.abortCurrentJoin) { |
| return; |
| } |
| for (Iterator it = this.listeners.iterator(); it.hasNext();) { |
| if (this.abortCurrentJoin) { |
| return; |
| } |
| JoinLeaveListener l = (JoinLeaveListener) it.next(); |
| try { |
| l.nodeJoined(RemoteGfManagerAgent.this, member); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| logger.warn("Listener threw an exception.", e); |
| } |
| } |
| } |
| |
| } finally { |
| // finished this join so remove it... |
| removePendingJoins(id); |
| |
| // clean up current join and abort flag... |
| if (this.abortCurrentJoin) { |
| logger.info("aborted {}", id); |
| } |
| this.currentJoin = null; |
| this.abortCurrentJoin = false; |
| } |
| } |
| |
| /** |
| * Adds a pending join entry. Note: attempting to reuse the same ArrayList instance results in |
| * some interesting deadlocks. |
| */ |
| protected void addPendingJoin(InternalDistributedMember id) { |
| synchronized (this.pendingJoinsLock) { |
| List oldPendingJoins = this.pendingJoins; |
| if (!oldPendingJoins.contains(id)) { |
| List newPendingJoins = new ArrayList(oldPendingJoins); |
| newPendingJoins.add(id); |
| this.pendingJoins = newPendingJoins; |
| } |
| } |
| } |
| |
| /** |
| * Removes any pending joins that match the member id. Note: attempting to reuse the same |
| * ArrayList instance results in some interesting deadlocks. |
| */ |
| private void removePendingJoins(InternalDistributedMember id) { |
| synchronized (this.pendingJoinsLock) { |
| List oldPendingJoins = this.pendingJoins; |
| if (oldPendingJoins.contains(id)) { |
| List newPendingJoins = new ArrayList(oldPendingJoins); |
| newPendingJoins.remove(id); |
| this.pendingJoins = newPendingJoins; |
| } |
| } |
| } |
| |
| /** |
| * Cancels any pending joins that match the member id. |
| */ |
| protected void cancelPendingJoins(InternalDistributedMember id) { |
| try { |
| // pause the join processor thread... |
| this.joinProcessor.pauseHandling(); |
| |
| // remove any further pending joins (should't be any)... |
| removePendingJoins(id); |
| |
| // abort any in-process handling of the member id... |
| this.joinProcessor.abort(id); |
| } finally { |
| AdminWaiters.cancelWaiters(id); |
| this.joinProcessor.resumeHandling(); |
| } |
| } |
| |
| /** |
| * Processes pending membership joins in a dedicated thread. If we processed the joins in the same |
| * thread as the membership handler, then we run the risk of getting deadlocks and such. |
| */ |
| // FIXME: Revisit/redesign this code |
| private class JoinProcessor extends LoggingThread { |
| private volatile boolean paused = false; |
| private volatile boolean shutDown = false; |
| private volatile InternalDistributedMember id; |
| private final Object lock = new Object(); |
| |
| public JoinProcessor() { |
| super("JoinProcessor"); |
| } |
| |
| public void shutDown() { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor: shutting down"); |
| } |
| this.shutDown = true; |
| this.interrupt(); |
| } |
| |
| private void pauseHandling() { |
| this.paused = true; |
| } |
| |
| private void resumeHandling() { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor: resuming. Is alive? {}", |
| this.isAlive()); |
| } |
| |
| // unpause if paused during a cancel... |
| this.paused = false; |
| |
| // notify to wake up... |
| synchronized (this.lock) { |
| this.lock.notify(); |
| } |
| } |
| |
| public void abort(InternalDistributedMember memberId) { |
| // abort if current join matches id... |
| if (memberId.equals(RemoteGfManagerAgent.this.currentJoin)) { |
| RemoteGfManagerAgent.this.abortCurrentJoin = true; |
| this.interrupt(); |
| } |
| // cancel handling of current event if it matches id... |
| if (this.id != null && this.id.equals(memberId)) { |
| this.id = null; |
| } |
| } |
| |
| @Override |
| public void run() { |
| /* Used to check whether there were pendingJoins before waiting */ |
| boolean noPendingJoins = false; |
| OUTER: while (!this.shutDown) { |
| SystemFailure.checkFailure(); |
| try { |
| if (!RemoteGfManagerAgent.this.isListening()) { |
| shutDown(); |
| } |
| |
| noPendingJoins = RemoteGfManagerAgent.this.pendingJoins.isEmpty(); |
| if (noPendingJoins && logger.isDebugEnabled()) { |
| logger.debug("Pausing as there are no pending joins ... "); |
| } |
| |
| // if paused OR no pendingJoins then just wait... |
| if (this.paused || noPendingJoins) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor is about to wait..."); |
| } |
| synchronized (this.lock) { |
| this.lock.wait(); |
| } |
| } |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor has woken up..."); |
| } |
| if (this.paused) |
| continue; |
| |
| // if no join was already in process or if aborted, get a new one... |
| if (this.id == null) { |
| List pendingJoinsRef = RemoteGfManagerAgent.this.pendingJoins; |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor pendingJoins: {}", |
| pendingJoinsRef.size()); |
| } |
| if (pendingJoinsRef.size() > 0) { |
| this.id = (InternalDistributedMember) pendingJoinsRef.get(0); |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor got a membership event for {}", |
| this.id); |
| } |
| } |
| } |
| if (this.paused) |
| continue; |
| |
| // process the join... |
| if (this.id != null) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor handling join for {}", this.id); |
| } |
| try { |
| RemoteGfManagerAgent.this.handleJoined(this.id); |
| } finally { |
| this.id = null; |
| } |
| } |
| |
| } catch (CancelException e) { |
| // we're done! |
| shutDown = true; // for safety |
| break; |
| } catch (InterruptedException ignore) { |
| // When this thread is "paused", it is interrupted |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor has been interrupted..."); |
| } |
| if (shutDown) { |
| break; |
| } |
| if (this.paused || noPendingJoins) {// fix for #39893 |
| /* |
| * JoinProcessor waits when: 1. this.paused is set to true 2. There are no pending joins |
| * |
| * If the JoinProcessor is interrupted when it was waiting due to second reason, it |
| * should still continue after catching InterruptedException. From code, currently, |
| * JoinProcessor is interrupted through JoinProcessor.abort() method which is called |
| * when a member departs as part of cancelPendingJoin(). |
| */ |
| if (logger.isDebugEnabled()) { |
| logger.debug("JoinProcessor was interrupted when it was paused, now resuming ...", |
| ignore); |
| } |
| noPendingJoins = false; |
| continue; |
| } |
| break; // Panic! |
| |
| } catch (OperationCancelledException ex) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "join cancelled by departure"); |
| } |
| continue; |
| |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| for (Throwable cause = e.getCause(); cause != null; cause = cause.getCause()) { |
| if (cause instanceof InterruptedException) { |
| if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { |
| logger.trace(LogMarker.DM_VERBOSE, "JoinProcessor has been interrupted..."); |
| } |
| continue OUTER; |
| } |
| } |
| logger.error("JoinProcessor caught exception...", e); |
| } // catch Throwable |
| } // while !shutDown |
| } |
| } |
| |
| private class MyMembershipListener implements MembershipListener { |
| |
| private final Set distributedMembers = new HashSet(); |
| |
| protected MyMembershipListener() {} |
| |
| protected void addMembers(Set initMembers) { |
| distributedMembers.addAll(initMembers); |
| } |
| |
| /** |
| * This method is invoked when a new member joins the system. If the member is an application VM |
| * or a GemFire system manager, we note it. |
| * |
| * @see JoinLeaveListener#nodeJoined |
| */ |
| @Override |
| public void memberJoined(DistributionManager distributionManager, |
| final InternalDistributedMember id) { |
| if (!isListening()) { |
| return; |
| } |
| synchronized (this) { |
| if (!this.distributedMembers.contains(id)) { |
| this.distributedMembers.add(id); |
| addPendingJoin(id); |
| joinProcessor.resumeHandling(); |
| } |
| } |
| } |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) {} |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| /** |
| * This method is invoked after a member has explicitly left the system. It may not get invoked |
| * if a member becomes unreachable due to crash or network problems. |
| * |
| * @see JoinLeaveListener#nodeCrashed |
| * @see JoinLeaveListener#nodeLeft |
| */ |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| InternalDistributedMember id, boolean crashed) { |
| synchronized (this) { |
| if (!this.distributedMembers.remove(id)) { |
| return; |
| } |
| cancelPendingJoins(id); |
| if (!isListening()) { |
| return; |
| } |
| } |
| |
| // remove the member... |
| |
| RemoteGemFireVM member = null; |
| switch (id.getVmKind()) { |
| case ClusterDistributionManager.NORMAL_DM_TYPE: |
| member = removeMember(id); |
| break; |
| case ClusterDistributionManager.ADMIN_ONLY_DM_TYPE: |
| break; |
| case ClusterDistributionManager.LOCATOR_DM_TYPE: |
| break; |
| case ClusterDistributionManager.LONER_DM_TYPE: |
| break; // should this ever happen? |
| default: |
| throw new IllegalArgumentException( |
| "Unknown VM Kind"); |
| } |
| |
| // clean up and notify JoinLeaveListeners... |
| if (member != null) { |
| for (Iterator it = listeners.iterator(); it.hasNext();) { |
| JoinLeaveListener l = (JoinLeaveListener) it.next(); |
| if (crashed) { |
| l.nodeCrashed(RemoteGfManagerAgent.this, member); |
| } else { |
| l.nodeLeft(RemoteGfManagerAgent.this, member); |
| } |
| } |
| member.stopStatListening(); |
| } |
| } |
| } |
| |
| } |