| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.admin.remote; |
| |
| import java.io.File; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.admin.AdminException; |
| import org.apache.geode.admin.GemFireHealth; |
| import org.apache.geode.admin.GemFireHealthConfig; |
| import org.apache.geode.admin.GemFireMemberStatus; |
| import org.apache.geode.admin.OperationCancelledException; |
| import org.apache.geode.admin.RegionSubRegionSnapshot; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Config; |
| import org.apache.geode.internal.admin.AdminBridgeServer; |
| import org.apache.geode.internal.admin.Alert; |
| import org.apache.geode.internal.admin.CacheInfo; |
| import org.apache.geode.internal.admin.DLockInfo; |
| import org.apache.geode.internal.admin.GemFireVM; |
| import org.apache.geode.internal.admin.GfManagerAgent; |
| import org.apache.geode.internal.admin.HealthListener; |
| import org.apache.geode.internal.admin.ListenerIdMap; |
| import org.apache.geode.internal.admin.Stat; |
| import org.apache.geode.internal.admin.StatAlertDefinition; |
| import org.apache.geode.internal.admin.StatListener; |
| import org.apache.geode.internal.admin.StatResource; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| |
| /** |
| * Provides access to a remote gemfire VM for purposes of gathering statistics and other info |
| * specific to that VM. |
| * |
| */ |
| public abstract class RemoteGemFireVM implements GemFireVM { |
| |
| protected final RemoteGfManagerAgent agent; |
| protected final InternalDistributedMember id; |
| |
| private volatile String name = null; |
| private volatile InetAddress host = null; |
| private volatile File workingDir = null; |
| private volatile Date birthDate = null; |
| private volatile File gemfireDir = null; |
| private volatile Boolean isDedicatedCacheServer = null; |
| |
| /** |
| * Keeps track of the <code>StatListener</code>s registered on statistics sampled in the remote |
| * VM. key is listenerId; value is StatListener. |
| */ |
| protected ListenerIdMap statListeners = new ListenerIdMap(); |
| private final Object statListenersLock = new Object(); |
| |
| /** |
| * A thread that asynchronously dispatches callbacks to <code>StatListener</code>s. |
| */ |
| protected final StatDispatcher dispatcher; |
| |
| /** |
| * The classpath from which to load the classes of objects inspected from this remote VM. |
| */ |
| protected volatile String inspectionClasspath; |
| |
| /** |
| * Has the distributed system member represented by this <code>GemFireVM</code> become |
| * unreachable? If so, we should not try to communicate with it. |
| */ |
| protected volatile boolean unreachable; |
| |
| /** How are objects in this remote VM inspected? */ |
| protected volatile int cacheInspectionMode = LIGHTWEIGHT_CACHE_VALUE; |
| |
| protected final Object healthLock = new Object(); |
| protected HealthListener healthListener = null; |
| protected int healthListenerId = 0; |
| |
| // constructors |
| |
| /** |
| * Creates a <code>RemoteApplicationVM</code> in a given distributed system (<code>agent</code>) |
| * with the given <code>id</code>. |
| * <p/> |
| * You MUST invoke {@link #startStatDispatcher()} immediately after constructing an instance. |
| * |
| * @param alertLevel The level of {@link Alert}s that this administration console should receive |
| * from this member of the distributed system. |
| */ |
| public RemoteGemFireVM(RemoteGfManagerAgent agent, InternalDistributedMember id, int alertLevel) { |
| this.agent = agent; |
| if (id == null) { |
| throw new NullPointerException( |
| "Cannot create a RemoteGemFireVM with a null id."); |
| } |
| this.id = id; |
| this.dispatcher = new StatDispatcher(); |
| sendAsync(AdminConsoleMessage.create(alertLevel)); |
| } |
| |
| public void startStatDispatcher() { |
| this.dispatcher.start(); |
| } |
| |
| // Object methods |
| |
| @Override // GemStoneAddition |
| public String toString() { |
| // fix for 31198 |
| String vmName = null; |
| try { |
| vmName = getName(); |
| } catch (OperationCancelledException e) { |
| // ignore and leave name equal to null |
| } |
| if (vmName == null || vmName.length() == 0) { |
| return this.id.toString(); |
| } |
| return vmName; |
| } |
| |
| @Override |
| public boolean equals(Object object) { |
| if (this == object) { |
| return true; |
| } |
| if (object instanceof RemoteGemFireVM) { |
| return this.id.equals(((RemoteGemFireVM) object).id); |
| } |
| return false; |
| } |
| |
| @Override // GemStoneAddition |
| public int hashCode() { |
| return this.id.hashCode(); |
| } |
| |
| // GemFireVM methods |
| |
| /** |
| * Returns the name of the remote system connection. |
| */ |
| @Override |
| public String getName() { |
| if (this.name == null) { |
| initialize(); |
| } |
| return name; |
| } |
| |
| /** |
| * Returns the host the manager is running on. |
| */ |
| @Override |
| public InetAddress getHost() { |
| if (this.host == null) { |
| initialize(); |
| } |
| return host; |
| } |
| |
| @Override |
| public File getWorkingDirectory() { |
| if (this.workingDir == null) { |
| initialize(); |
| } |
| return this.workingDir; |
| } |
| |
| @Override |
| public File getGeodeHomeDir() { |
| if (this.gemfireDir == null) { |
| initialize(); |
| } |
| return gemfireDir; |
| } |
| |
| @Override |
| public Date getBirthDate() { |
| if (this.birthDate == null) { |
| initialize(); |
| } |
| return birthDate; |
| } |
| |
| public boolean isDedicatedCacheServer() { |
| if (this.isDedicatedCacheServer == null) { |
| initialize(); |
| } |
| return this.isDedicatedCacheServer.booleanValue(); |
| } |
| |
| private void initialize() { |
| FetchHostResponse response = (FetchHostResponse) sendAndWait(FetchHostRequest.create()); |
| this.name = response.getName(); |
| this.host = response.getHost(); |
| this.workingDir = response.getWorkingDirectory(); |
| this.gemfireDir = response.getGeodeHomeDir(); |
| this.birthDate = new Date(response.getBirthDate()); |
| this.isDedicatedCacheServer = Boolean.valueOf(response.isDedicatedCacheServer()); |
| } |
| |
| /** |
| * Retrieves all statistic resources from the remote vm. |
| * |
| * @return array of all statistic resources |
| * |
| * @see FetchStatsRequest |
| * @see FetchStatsResponse#getAllStats |
| */ |
| @Override |
| public StatResource[] getAllStats() { |
| FetchStatsResponse resp = (FetchStatsResponse) sendAndWait(FetchStatsRequest.create(null)); |
| return resp.getAllStats(this); |
| } |
| |
| /** |
| * Retrieves all statistic resources from the remote VM except for those involving SharedClass. |
| * |
| * @return array of non-SharedClass statistic resources |
| * |
| * @see FetchStatsRequest |
| * @see FetchStatsResponse#getStats |
| */ |
| @Override |
| public StatResource[] getStats(String statisticsTypeName) { |
| FetchStatsResponse resp = |
| (FetchStatsResponse) sendAndWait(FetchStatsRequest.create(statisticsTypeName)); |
| return resp.getAllStats(this); |
| } |
| |
| /** |
| * Returns information about distributed locks held by the remote VM. |
| * |
| * @see FetchDistLockInfoRequest |
| */ |
| @Override |
| public DLockInfo[] getDistributedLockInfo() { |
| FetchDistLockInfoResponse resp = |
| (FetchDistLockInfoResponse) sendAndWait(FetchDistLockInfoRequest.create()); |
| return resp.getLockInfos(); |
| } |
| |
| /** |
| * Adds a <code>StatListener</code> that is notified when a statistic in a given statistics |
| * instance changes value. |
| * |
| * @param observer The listener to be notified |
| * @param observedResource The statistics instance to be observed |
| * @param observedStat The statistic to be observed |
| * |
| * @see AddStatListenerRequest |
| */ |
| @Override |
| public void addStatListener(StatListener observer, StatResource observedResource, |
| Stat observedStat) { |
| AddStatListenerResponse resp = (AddStatListenerResponse) sendAndWait( |
| AddStatListenerRequest.create(observedResource, observedStat)); |
| int listenerId = resp.getListenerId(); |
| synchronized (this.statListenersLock) { |
| this.statListeners.put(listenerId, observer); |
| } |
| } |
| |
| /** |
| * Notes that several statistics values have been updated in the distributed system member modeled |
| * by this <code>RemoteGemFireVM</code> and invokes the {@link StatListener}s accordingly. Note |
| * that the listener notification happens asynchronously. |
| * |
| * @param timestamp The time at which the statistics were sampled |
| * @param listenerIds The <code>id</code>s of the <code>StatListener</code>s to be notified. |
| * @param values The new values of the statistics |
| */ |
| public void callStatListeners(long timestamp, int[] listenerIds, double[] values) { |
| dispatcher.put(new DispatchArgs(timestamp, listenerIds, values)); |
| } |
| |
| |
| |
| /** |
| * Invokes the callback methods on a bunch of <code>StatListener</code>s in response to a |
| * statistics update message being received. This method is invoked in its own thread. |
| * |
| * for each listener in statListeners call stat value changed if its id is in listenerIds call |
| * stat value unchanged if its id is not in listenerIds call cancelStatListener and |
| * statListeners.remove if its id is negative in listenerIds |
| * |
| * @see #cancelStatListener |
| */ |
| protected void internalCallStatListeners(long timestamp, int[] listenerIds, double[] values) { |
| ListenerIdMap.Entry[] entries = null; |
| List listenersToRemove = new ArrayList(); |
| synchronized (this.statListenersLock) { |
| entries = this.statListeners.entries(); |
| } |
| |
| for (int j = 0; j < entries.length; j++) { |
| int listenerId = entries[j].getKey(); |
| StatListener sl = (StatListener) entries[j].getValue(); |
| int i; |
| for (i = 0; i < listenerIds.length; i++) { |
| if (listenerIds[i] == listenerId || listenerIds[i] == -listenerId) { |
| break; |
| } |
| } |
| if (i == listenerIds.length) { |
| sl.statValueUnchanged(timestamp); |
| } else if (listenerIds[i] < 0) { // Stat resource went away |
| listenersToRemove.add(Integer.valueOf(listenerId)); |
| } else { |
| sl.statValueChanged(values[i], timestamp); |
| } |
| } |
| |
| synchronized (this.statListenersLock) { |
| for (Iterator iter = listenersToRemove.iterator(); iter.hasNext();) { |
| int i = ((Integer) iter.next()).intValue(); |
| statListeners.remove(i); |
| cancelStatListener(i); |
| } |
| } |
| } |
| |
| /** |
| * Sends a message to the remote VM letting it know that the listener with the given id no longer |
| * needs events set to it. |
| */ |
| private void cancelStatListener(int listenerId) { |
| sendAndWait(CancelStatListenerRequest.create(listenerId)); |
| } |
| |
| /** |
| * Removes a <code>StatListener</code> that receives updates from the remote member VM. |
| */ |
| @Override |
| public void removeStatListener(StatListener observer) { |
| int listenerId = -1; |
| boolean foundIt = false; |
| synchronized (this.statListenersLock) { |
| ListenerIdMap.EntryIterator it = this.statListeners.iterator(); |
| ListenerIdMap.Entry e = it.next(); |
| while (e != null) { |
| if (e.getValue() == observer) { |
| foundIt = true; |
| listenerId = e.getKey(); |
| this.statListeners.remove(listenerId); |
| break; |
| } |
| e = it.next(); |
| } |
| } |
| if (foundIt) { |
| cancelStatListener(listenerId); |
| } |
| } |
| |
| /** |
| * Returns the configuration of the remote VM. |
| * |
| * @see FetchSysCfgRequest |
| */ |
| @Override |
| public void addHealthListener(HealthListener observer, GemFireHealthConfig cfg) { |
| synchronized (this.healthLock) { |
| this.healthListener = observer; |
| AddHealthListenerResponse response = |
| (AddHealthListenerResponse) sendAndWait(AddHealthListenerRequest.create(cfg)); |
| this.healthListenerId = response.getHealthListenerId(); |
| } |
| } |
| |
| @Override |
| public void removeHealthListener() { |
| synchronized (this.healthLock) { |
| this.healthListener = null; |
| if (this.healthListenerId != 0) { |
| sendAndWait(RemoveHealthListenerRequest.create(this.healthListenerId)); |
| this.healthListenerId = 0; |
| } |
| } |
| } |
| |
| @Override |
| public void resetHealthStatus() { |
| synchronized (this.healthLock) { |
| if (this.healthListenerId != 0) { |
| sendAndWait(ResetHealthStatusRequest.create(this.healthListenerId)); |
| } |
| } |
| } |
| |
| @Override |
| public String[] getHealthDiagnosis(GemFireHealth.Health healthCode) { |
| synchronized (this.healthLock) { |
| if (this.healthListenerId != 0) { |
| FetchHealthDiagnosisResponse response = (FetchHealthDiagnosisResponse) sendAndWait( |
| FetchHealthDiagnosisRequest.create(this.healthListenerId, healthCode)); |
| return response.getDiagnosis(); |
| } else { |
| return new String[] {}; |
| } |
| } |
| } |
| |
| /** |
| * Called by HealthListenerMessage when the message is received. |
| */ |
| void callHealthListeners(int listenerId, GemFireHealth.Health newStatus) { |
| HealthListener hl = null; |
| synchronized (this.healthLock) { |
| // Make sure this call was to the current listener |
| if (this.healthListenerId == listenerId) { |
| hl = this.healthListener; |
| } |
| } |
| if (hl != null) { |
| try { |
| hl.healthChanged(this, newStatus); |
| } catch (RuntimeException ignore) { |
| } |
| } |
| } |
| |
| @Override |
| public Config getConfig() { |
| FetchSysCfgResponse response = (FetchSysCfgResponse) sendAndWait(FetchSysCfgRequest.create()); |
| return response.getConfig(); |
| } |
| |
| /** |
| * Returns the runtime {@link org.apache.geode.admin.GemFireMemberStatus} from the vm The idea is |
| * this snapshot is similar to stats that represent the current state of a running VM. However, |
| * this is a bit higher level than a stat |
| */ |
| @Override |
| public GemFireMemberStatus getSnapshot() { |
| RefreshMemberSnapshotResponse response = |
| (RefreshMemberSnapshotResponse) sendAndWait(RefreshMemberSnapshotRequest.create()); |
| return response.getSnapshot(); |
| } |
| |
| /** |
| * Returns the runtime {@link org.apache.geode.admin.RegionSubRegionSnapshot} from the vm The idea |
| * is this snapshot is quickly salvageable to present a cache's region's info |
| */ |
| @Override |
| public RegionSubRegionSnapshot getRegionSnapshot() { |
| RegionSubRegionsSizeResponse response = |
| (RegionSubRegionsSizeResponse) sendAndWait(RegionSubRegionSizeRequest.create()); |
| return response.getSnapshot(); |
| } |
| |
| /** |
| * Updates the configuration of the remote VM. |
| * |
| * @see StoreSysCfgRequest |
| */ |
| @Override |
| public void setConfig(Config cfg) { |
| /* StoreSysCfgResponse response = (StoreSysCfgResponse) */ |
| sendAndWait(StoreSysCfgRequest.create(cfg)); |
| } |
| |
| /** |
| * Returns the agent for the distributed system to which this remote VM belongs. |
| */ |
| @Override |
| public GfManagerAgent getManagerAgent() { |
| return this.agent; |
| } |
| |
| |
| // public String tailSystemLog(){ |
| // TailLogResponse resp = (TailLogResponse)sendAndWait(TailLogRequest.create()); |
| // return resp.getTail(); |
| // } |
| |
| @Override |
| public String[] getSystemLogs() { |
| TailLogResponse resp = (TailLogResponse) sendAndWait(TailLogRequest.create()); |
| String main = resp.getTail(); |
| String child = resp.getChildTail(); |
| String[] retVal = null; |
| if (main != null) { |
| if (child != null) { |
| retVal = new String[] {main, child}; |
| } else { |
| retVal = new String[] {main}; |
| } |
| } else { |
| retVal = new String[0]; |
| } |
| return retVal; |
| } |
| |
| @Override |
| public String getVersionInfo() { |
| VersionInfoResponse resp = (VersionInfoResponse) sendAndWait(VersionInfoRequest.create()); |
| return resp.getVersionInfo(); |
| } |
| |
| /** |
| * Returns information about the root <code>Region</code>s hosted in the remote VM. |
| * |
| * @see RootRegionRequest |
| */ |
| @Override |
| public Region[] getRootRegions() { |
| RootRegionResponse resp = (RootRegionResponse) sendAndWait(RootRegionRequest.create()); |
| return resp.getRegions(this); |
| } |
| |
| @Override |
| public Region getRegion(CacheInfo c, String path) { |
| RegionResponse resp = (RegionResponse) sendAndWait(RegionRequest.createForGet(c, path)); |
| return resp.getRegion(this); |
| } |
| |
| @Override |
| public Region createVMRootRegion(CacheInfo c, String regionPath, RegionAttributes attrs) |
| throws AdminException { |
| |
| RegionResponse resp = |
| (RegionResponse) sendAndWait(RegionRequest.createForCreateRoot(c, regionPath, attrs)); |
| |
| Exception ex = resp.getException(); |
| if (ex != null) { |
| throw new AdminException( |
| String.format("An Exception was thrown while creating VM root region %s", |
| regionPath), |
| ex); |
| } else { |
| return resp.getRegion(this); |
| } |
| } |
| |
| @Override |
| public Region createSubregion(CacheInfo c, String parentPath, String regionPath, |
| RegionAttributes attrs) throws AdminException { |
| |
| RegionResponse resp = (RegionResponse) sendAndWait( |
| RegionRequest.createForCreateSubregion(c, parentPath, regionPath, attrs)); |
| |
| Exception ex = resp.getException(); |
| if (ex != null) { |
| throw new AdminException(String.format("While creating subregion %s of %s", |
| new Object[] {regionPath, parentPath}), ex); |
| } else { |
| return resp.getRegion(this); |
| } |
| } |
| |
| @Override |
| public void setCacheInspectionMode(int mode) { |
| this.cacheInspectionMode = mode; |
| } |
| |
| @Override |
| public int getCacheInspectionMode() { |
| return this.cacheInspectionMode; |
| } |
| |
| /** |
| * Takes a snapshot of a <code>Region</code> hosted in the remote VM. |
| * |
| * @param regionName The name of the <Code>Region</code> |
| * @param snapshotId The sequence number of the snapshot |
| * |
| * @see AppCacheSnapshotMessage |
| */ |
| @Override |
| public void takeRegionSnapshot(String regionName, int snapshotId) { |
| sendAsync(AppCacheSnapshotMessage.create(regionName, snapshotId)); |
| } |
| |
| // public void flushSnapshots() { |
| // sendAsync(FlushAppCacheSnapshotMessage.create()); |
| // } |
| |
| |
| // public boolean hasCache() { |
| // throw new UnsupportedOperationException("Not yet implemented"); |
| // } |
| |
| // additional instance methods |
| RemoteStat[] getResourceStatsByID(long rsrcId) { |
| FetchResourceAttributesResponse response = (FetchResourceAttributesResponse) sendAndWait( |
| FetchResourceAttributesRequest.create(rsrcId)); |
| return response.getStats(); |
| } |
| |
| |
| // void fireCacheCreated(RemoteApplicationProcess app) { |
| // app.setSystemManager(this); |
| // synchronized(this.connectionListeners) { |
| // Iterator iter = this.connectionListeners.iterator(); |
| // while (iter.hasNext()) { |
| // ((ConnectionListener)iter.next()).cacheCreated(this, app); |
| // } |
| // } |
| // } |
| |
| // void fireCacheClosed(RemoteApplicationProcess app) { |
| // app.setSystemManager(this); |
| // synchronized(this.connectionListeners) { |
| // Iterator iter = this.connectionListeners.iterator(); |
| // while (iter.hasNext()) { |
| // ((ConnectionListener)iter.next()).cacheClosed(this, app); |
| // } |
| // } |
| // } |
| |
| @Override |
| public InternalDistributedMember getId() { |
| return this.id; |
| } |
| |
| @Override |
| public CacheInfo getCacheInfo() { |
| CacheInfoResponse resp = (CacheInfoResponse) sendAndWait(CacheInfoRequest.create()); |
| RemoteCacheInfo result = resp.getCacheInfo(); |
| if (result != null) { |
| result.setGemFireVM(this); |
| } |
| return result; |
| } |
| |
| /** |
| * Checks whether a durable-queue for a given client is present on the system member represented |
| * by this RemoteGemFireVM |
| * |
| * @param durableClientId - the 'durable-client-id' for the client |
| * @return - true if the member contains a durable-queue for the given client |
| * |
| * @since GemFire 5.6 |
| */ |
| public boolean hasDurableClient(String durableClientId) { |
| DurableClientInfoResponse resp = |
| (DurableClientInfoResponse) sendAndWait(DurableClientInfoRequest.create(durableClientId, |
| DurableClientInfoRequest.HAS_DURABLE_CLIENT_REQUEST)); |
| boolean result = resp.getResultBoolean(); |
| return result; |
| } |
| |
| /** |
| * Checks whether the system member represented by this RemoteGemFireVM is hosting a primary |
| * durable-queue for the client |
| * |
| * @param durableClientId - the 'durable-client-id' for the client |
| * @return - true if the member contains a primary durable-queue for the given client |
| * |
| * @since GemFire 5.6 |
| */ |
| public boolean isPrimaryForDurableClient(String durableClientId) { |
| DurableClientInfoResponse resp = |
| (DurableClientInfoResponse) sendAndWait(DurableClientInfoRequest.create(durableClientId, |
| DurableClientInfoRequest.IS_PRIMARY_FOR_DURABLE_CLIENT_REQUEST)); |
| boolean result = resp.getResultBoolean(); |
| return result; |
| } |
| |
| @Override |
| public CacheInfo setCacheLockTimeout(CacheInfo c, int v) throws AdminException { |
| return setCacheConfigValue(c, LOCK_TIMEOUT_CODE, v); |
| } |
| |
| @Override |
| public CacheInfo setCacheLockLease(CacheInfo c, int v) throws AdminException { |
| return setCacheConfigValue(c, LOCK_LEASE_CODE, v); |
| } |
| |
| @Override |
| public CacheInfo setCacheSearchTimeout(CacheInfo c, int v) throws AdminException { |
| return setCacheConfigValue(c, SEARCH_TIMEOUT_CODE, v); |
| } |
| |
| @Override |
| public AdminBridgeServer addCacheServer(CacheInfo cache) throws AdminException { |
| |
| BridgeServerRequest request = BridgeServerRequest.createForAdd(cache); |
| BridgeServerResponse response = (BridgeServerResponse) sendAndWait(request); |
| if (response.getException() != null) { |
| Exception ex = response.getException(); |
| throw new AdminException(ex.getMessage(), ex); |
| |
| } else { |
| return response.getBridgeInfo(); |
| } |
| } |
| |
| @Override |
| public AdminBridgeServer getBridgeInfo(CacheInfo cache, int bridgeRef) throws AdminException { |
| |
| BridgeServerRequest request = BridgeServerRequest.createForInfo(cache, bridgeRef); |
| BridgeServerResponse response = (BridgeServerResponse) sendAndWait(request); |
| if (response.getException() != null) { |
| Exception ex = response.getException(); |
| throw new AdminException(ex.getMessage(), ex); |
| |
| } else { |
| return response.getBridgeInfo(); |
| } |
| } |
| |
| @Override |
| public AdminBridgeServer startBridgeServer(CacheInfo cache, AdminBridgeServer bridge) |
| throws AdminException { |
| |
| BridgeServerRequest request = |
| BridgeServerRequest.createForStart(cache, (RemoteBridgeServer) bridge); |
| BridgeServerResponse response = (BridgeServerResponse) sendAndWait(request); |
| if (response.getException() != null) { |
| Exception ex = response.getException(); |
| throw new AdminException(ex.getMessage(), ex); |
| |
| } else { |
| return response.getBridgeInfo(); |
| } |
| } |
| |
| @Override |
| public AdminBridgeServer stopBridgeServer(CacheInfo cache, AdminBridgeServer bridge) |
| throws AdminException { |
| |
| BridgeServerRequest request = |
| BridgeServerRequest.createForStop(cache, (RemoteBridgeServer) bridge); |
| BridgeServerResponse response = (BridgeServerResponse) sendAndWait(request); |
| if (response.getException() != null) { |
| Exception ex = response.getException(); |
| throw new AdminException(ex.getMessage(), ex); |
| |
| } else { |
| return response.getBridgeInfo(); |
| } |
| } |
| |
| |
| static final int LOCK_TIMEOUT_CODE = 1; |
| static final int LOCK_LEASE_CODE = 2; |
| static final int SEARCH_TIMEOUT_CODE = 3; |
| |
| /** |
| * Changes a Cache configuration value in a remote cache by sending the remote member a |
| * {@link CacheConfigRequest}. |
| * |
| * @param c The remote cache to be changed |
| * @param opCode The code of the operation to perform |
| * @param value A value that is an argument to the operation |
| * |
| * @throws AdminException If a problem is encountered in the remote VM while changing the |
| * configuration. |
| */ |
| private CacheInfo setCacheConfigValue(CacheInfo c, int opCode, int value) throws AdminException { |
| |
| if (c.isClosed()) { |
| return c; |
| } |
| CacheConfigResponse resp = |
| (CacheConfigResponse) sendAndWait(CacheConfigRequest.create(c, opCode, value)); |
| if (resp.getException() != null) { |
| Exception ex = resp.getException(); |
| throw new AdminException(ex.getMessage(), ex); |
| |
| } else if (resp.getCacheInfo() == null) { |
| c.setClosed(); |
| return c; |
| |
| } else { |
| return resp.getCacheInfo(); |
| } |
| } |
| |
| /** |
| * Stops listening for statistics updates. Invoked when this <code>GemFireVM</code> disconnects or |
| * when this member leaves the distributed system. |
| */ |
| public void stopStatListening() { |
| synchronized (this.statListenersLock) { |
| this.statListeners = new ListenerIdMap(); // we don't provide a way to empty a ListenerIdMap |
| unreachable = true; |
| } |
| dispatcher.stopDispatching(); |
| } |
| |
| /** |
| * Invoked by the {@link RemoteGfManagerAgent} when the member that this <code>GemFireVM</code> |
| * represents has left the distributed system. |
| * |
| * @param alertListenerRegistered Was there an alert listener registered for this |
| * <code>GemFireVM</code>'s agent? If so, the alert listeners should be removed in the |
| * member VMs. |
| * |
| * @see AdminConsoleDisconnectMessage |
| */ |
| public void disconnect(boolean alertListenerRegistered) { |
| // Thread.dumpStack(); |
| try { |
| AdminConsoleDisconnectMessage msg = AdminConsoleDisconnectMessage.create(); |
| msg.setAlertListenerExpected(alertListenerRegistered); |
| msg.setCrashed(false); |
| sendAsync(msg); |
| } finally { |
| stopStatListening(); |
| } |
| } |
| |
| |
| @Override |
| public void setInspectionClasspath(String classpath) { |
| this.inspectionClasspath = classpath; |
| } |
| |
| @Override |
| public String getInspectionClasspath() { |
| return this.inspectionClasspath; |
| } |
| |
| // inner classes |
| |
| /** |
| * A daemon thread that reads org.apache.geode.internal.admin.remote.RemoteGemFireVM.DispatchArgs |
| * off of a queue and delivers callbacks to the appropriate |
| * {@link org.apache.geode.internal.admin.StatListener}. |
| */ |
| private class StatDispatcher extends LoggingThread { |
| private BlockingQueue queue = new LinkedBlockingQueue(); |
| private volatile boolean stopped = false; |
| |
| protected StatDispatcher() { |
| super("StatDispatcher"); |
| } |
| |
| protected synchronized void stopDispatching() { |
| this.stopped = true; |
| this.interrupt(); |
| } |
| |
| @Override // GemStoneAddition |
| public void run() { |
| while (!stopped) { |
| SystemFailure.checkFailure(); |
| try { |
| DispatchArgs args = (DispatchArgs) queue.take(); |
| internalCallStatListeners(args.timestamp, args.listenerIds, args.values); |
| |
| } catch (InterruptedException ex) { |
| // No need to reset the interrupt bit, we'll just exit. |
| break; |
| |
| } catch (Exception ignore) { |
| } |
| } |
| } |
| |
| protected void put(DispatchArgs args) { |
| for (;;) { |
| RemoteGemFireVM.this.agent.getDM().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| queue.put(args); |
| break; |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| } |
| |
| /** |
| * Encapsulates an update to several statistics |
| */ |
| private static class DispatchArgs { |
| protected final long timestamp; |
| protected final int[] listenerIds; |
| protected final double[] values; |
| |
| /** |
| * Creates a new <code>DispatchArgs</code> |
| * |
| * @param timestamp The time at which the statistics were sampled |
| * @param listenerIds The ids of the <code>StatListener</code>s on which callbacks should be |
| * invoked. |
| * @param values The new values of the statistics |
| */ |
| protected DispatchArgs(long timestamp, int[] listenerIds, double[] values) { |
| this.timestamp = timestamp; |
| this.listenerIds = listenerIds; |
| this.values = values; |
| } |
| } |
| |
| /** |
| * Sends an AdminRequest to this dm (that is, to member of the distributed system represented by |
| * this <code>RemoteGemFireVM</code>) and waits for the AdminReponse. |
| */ |
| AdminResponse sendAndWait(AdminRequest msg) { |
| if (unreachable) { |
| throw new OperationCancelledException( |
| String.format("%s is unreachable. It has either left or crashed.", |
| this.name)); |
| } |
| if (this.id == null) { |
| throw new NullPointerException( |
| "The id of this RemoteGemFireVM is null!"); |
| } |
| msg.setRecipient(this.id); |
| msg.setModifiedClasspath(inspectionClasspath); |
| return agent.sendAndWait(msg); |
| } |
| |
| /** |
| * Sends a message to this dm (that is, to member of the distributed system represented by this |
| * <code>RemoteGemFireVM</code>) and does not wait for a response |
| */ |
| void sendAsync(DistributionMessage msg) { |
| msg.setRecipient(id); |
| if (msg instanceof AdminRequest) { |
| ((AdminRequest) msg).setModifiedClasspath(inspectionClasspath); |
| } |
| agent.sendAsync(msg); |
| } |
| |
| /** |
| * This method should be used to set the Alerts Manager for the member agent. Stat Alerts |
| * Aggregator would use this method to set stat Alerts Manager with the available alert |
| * definitions and the refresh interval set for each member joining the distributed system. |
| * |
| * @param alertDefs Stat Alert Definitions to set for the Alerts Manager |
| * @param refreshInterval refresh interval to be used by the Alerts Manager |
| * @param setRemotely whether to be set on remote VM |
| */ |
| @Override |
| public void setAlertsManager(StatAlertDefinition[] alertDefs, long refreshInterval, |
| boolean setRemotely) { |
| if (setRemotely) { |
| // TODO: is the check for valid AdminResponse required |
| sendAsync(StatAlertsManagerAssignMessage.create(alertDefs, refreshInterval)); |
| } |
| } |
| |
| /** |
| * This method would be used to set refresh interval for the GemFireVM. This method would mostly |
| * be called on each member after initial set up whenever the refresh interval is changed. |
| * |
| * @param refreshInterval refresh interval to set (in milliseconds) |
| */ |
| @Override |
| public void setRefreshInterval(long refreshInterval) { |
| sendAsync(ChangeRefreshIntervalMessage.create(refreshInterval)); |
| } |
| |
| /** |
| * This method would be used to set Sta Alert Definitions for the GemFireVM. This method would |
| * mostly be called on each member after initial set up whenever one or more Stat Alert |
| * Definitions get added/updated/removed. |
| * |
| * @param alertDefs an array of StaAlertDefinition objects |
| * @param actionCode one of UpdateAlertDefinitionRequest.ADD_ALERT_DEFINITION, |
| * UpdateAlertDefinitionRequestUPDATE_ALERT_DEFINITION, |
| * UpdateAlertDefinitionRequest.REMOVE_ALERT_DEFINITION |
| */ |
| @Override |
| public void updateAlertDefinitions(StatAlertDefinition[] alertDefs, int actionCode) { |
| // TODO: is the check for valid AdminResponse required |
| sendAsync(UpdateAlertDefinitionMessage.create(alertDefs, actionCode)); |
| } |
| |
| } |