| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.tribes.group; |
| |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.management.MBeanServer; |
| import javax.management.ObjectName; |
| |
| import org.apache.catalina.tribes.ByteMessage; |
| import org.apache.catalina.tribes.ChannelException; |
| import org.apache.catalina.tribes.ChannelInterceptor; |
| import org.apache.catalina.tribes.ChannelListener; |
| import org.apache.catalina.tribes.ChannelMessage; |
| import org.apache.catalina.tribes.ChannelReceiver; |
| import org.apache.catalina.tribes.ChannelSender; |
| import org.apache.catalina.tribes.ErrorHandler; |
| import org.apache.catalina.tribes.Heartbeat; |
| import org.apache.catalina.tribes.JmxChannel; |
| import org.apache.catalina.tribes.ManagedChannel; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.MembershipListener; |
| import org.apache.catalina.tribes.MembershipService; |
| import org.apache.catalina.tribes.RemoteProcessException; |
| import org.apache.catalina.tribes.UniqueId; |
| import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; |
| import org.apache.catalina.tribes.io.BufferPool; |
| import org.apache.catalina.tribes.io.ChannelData; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.jmx.JmxRegistry; |
| import org.apache.catalina.tribes.util.Arrays; |
| import org.apache.catalina.tribes.util.Logs; |
| import org.apache.catalina.tribes.util.StringManager; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| |
| /** |
| * The default implementation of a Channel.<br> |
| * The GroupChannel manages the replication channel. It coordinates message being sent and received with membership |
| * announcements. The channel has a chain of interceptors that can modify the message or perform other logic.<br> |
| * It manages a complete group, both membership and replication. |
| */ |
| public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel, JmxChannel, GroupChannelMBean { |
| |
| private static final Log log = LogFactory.getLog(GroupChannel.class); |
| protected static final StringManager sm = StringManager.getManager(GroupChannel.class); |
| |
| /** |
| * Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread |
| * for the heart beat. |
| */ |
| protected boolean heartbeat = true; |
| |
| /** |
| * If <code>heartbeat == true</code> then how often do we want this heartbeat to run. The default value is 5000 |
| * milliseconds. |
| */ |
| protected long heartbeatSleeptime = 5 * 1000; |
| |
| /** |
| * Internal heartbeat future |
| */ |
| protected ScheduledFuture<?> heartbeatFuture = null; |
| protected ScheduledFuture<?> monitorFuture; |
| |
| /** |
| * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> |
| * - MembershipService<br> |
| * - ChannelSender <br> |
| * - ChannelReceiver<br> |
| */ |
| protected final ChannelCoordinator coordinator = new ChannelCoordinator(); |
| |
| /** |
| * The first interceptor in the interceptor stack. The interceptors are chained in a linked list, so we only need a |
| * reference to the first one |
| */ |
| protected ChannelInterceptor interceptors = null; |
| |
| /** |
| * A list of membership listeners that subscribe to membership announcements |
| */ |
| protected final List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>(); |
| |
| /** |
| * A list of channel listeners that subscribe to incoming messages |
| */ |
| protected final List<ChannelListener> channelListeners = new CopyOnWriteArrayList<>(); |
| |
| /** |
| * If set to true, the GroupChannel will check to make sure that |
| */ |
| protected boolean optionCheck = false; |
| |
| /** |
| * the name of this channel. |
| */ |
| protected String name = null; |
| |
| /** |
| * the jmx domain which this channel is registered. |
| */ |
| private String jmxDomain = "ClusterChannel"; |
| |
| /** |
| * the jmx prefix which will be used with channel ObjectName. |
| */ |
| private String jmxPrefix = ""; |
| |
| /** |
| * If set to true, this channel is registered with jmx. |
| */ |
| private boolean jmxEnabled = true; |
| |
| /** |
| * Executor service. |
| */ |
| protected ScheduledExecutorService utilityExecutor = null; |
| |
| /** |
| * the ObjectName of this channel. |
| */ |
| private ObjectName oname = null; |
| |
| /** |
| * Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.<br> |
| * The first interceptor is always the channel itself. |
| */ |
| public GroupChannel() { |
| addInterceptor(this); |
| } |
| |
| |
| @Override |
| public void addInterceptor(ChannelInterceptor interceptor) { |
| if (interceptors == null) { |
| interceptors = interceptor; |
| interceptors.setNext(coordinator); |
| interceptors.setPrevious(null); |
| coordinator.setPrevious(interceptors); |
| } else { |
| ChannelInterceptor last = interceptors; |
| while (last.getNext() != coordinator) { |
| last = last.getNext(); |
| } |
| last.setNext(interceptor); |
| interceptor.setNext(coordinator); |
| interceptor.setPrevious(last); |
| coordinator.setPrevious(interceptor); |
| } |
| } |
| |
| /** |
| * Sends a heartbeat through the interceptor stack.<br> |
| * Invoke this method from the application on a periodic basis if you have turned off internal heartbeats |
| * <code>channel.setHeartbeat(false)</code> |
| */ |
| @Override |
| public void heartbeat() { |
| super.heartbeat(); |
| |
| for (MembershipListener listener : membershipListeners) { |
| if (listener instanceof Heartbeat) { |
| ((Heartbeat) listener).heartbeat(); |
| } |
| } |
| |
| for (ChannelListener listener : channelListeners) { |
| if (listener instanceof Heartbeat) { |
| ((Heartbeat) listener).heartbeat(); |
| } |
| } |
| } |
| |
| |
| @Override |
| public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException { |
| return send(destination, msg, options, null); |
| } |
| |
| @Override |
| public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) |
| throws ChannelException { |
| if (msg == null) { |
| throw new ChannelException(sm.getString("groupChannel.nullMessage")); |
| } |
| XByteBuffer buffer = null; |
| try { |
| if (destination == null || destination.length == 0) { |
| throw new ChannelException(sm.getString("groupChannel.noDestination")); |
| } |
| ChannelData data = new ChannelData(true);// generates a unique Id |
| data.setAddress(getLocalMember(false)); |
| data.setTimestamp(System.currentTimeMillis()); |
| byte[] b; |
| if (msg instanceof ByteMessage) { |
| b = ((ByteMessage) msg).getMessage(); |
| options = options | SEND_OPTIONS_BYTE_MESSAGE; |
| } else { |
| b = XByteBuffer.serialize(msg); |
| options = options & (~SEND_OPTIONS_BYTE_MESSAGE); |
| } |
| data.setOptions(options); |
| // XByteBuffer buffer = new XByteBuffer(b.length+128,false); |
| buffer = BufferPool.getBufferPool().getBuffer(b.length + 128, false); |
| buffer.append(b, 0, b.length); |
| data.setMessage(buffer); |
| InterceptorPayload payload = null; |
| if (handler != null) { |
| payload = new InterceptorPayload(); |
| payload.setErrorHandler(handler); |
| } |
| getFirstInterceptor().sendMessage(destination, data, payload); |
| if (Logs.MESSAGES.isTraceEnabled()) { |
| Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " + |
| new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); |
| Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " + msg); |
| } |
| |
| return new UniqueId(data.getUniqueId()); |
| } catch (RuntimeException | IOException e) { |
| throw new ChannelException(e); |
| } finally { |
| if (buffer != null) { |
| BufferPool.getBufferPool().returnBuffer(buffer); |
| } |
| } |
| } |
| |
| |
| /** |
| * Callback from the interceptor stack. <br> |
| * When a message is received from a remote node, this method will be invoked by the previous interceptor.<br> |
| * This method can also be used to send a message to other components within the same application, but it's an |
| * extreme case, and you're probably better off doing that logic between the applications itself. |
| * |
| * @param msg ChannelMessage |
| */ |
| @Override |
| public void messageReceived(ChannelMessage msg) { |
| if (msg == null) { |
| return; |
| } |
| try { |
| if (Logs.MESSAGES.isTraceEnabled()) { |
| Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " + |
| new java.sql.Timestamp(System.currentTimeMillis()) + " from " + msg.getAddress().getName()); |
| } |
| |
| Serializable fwd; |
| if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) { |
| fwd = new ByteMessage(msg.getMessage().getBytes()); |
| } else { |
| try { |
| fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength()); |
| } catch (Exception e) { |
| log.error(sm.getString("groupChannel.unable.deserialize", msg), e); |
| return; |
| } |
| } |
| if (Logs.MESSAGES.isTraceEnabled()) { |
| Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " + fwd); |
| } |
| |
| // get the actual member with the correct alive time |
| Member source = msg.getAddress(); |
| boolean rx = false; |
| boolean delivered = false; |
| for (ChannelListener channelListener : channelListeners) { |
| if (channelListener != null && channelListener.accept(fwd, source)) { |
| channelListener.messageReceived(fwd, source); |
| delivered = true; |
| // if the message was accepted by an RPC channel, that channel |
| // is responsible for returning the reply, otherwise we send an absence reply |
| if (channelListener instanceof RpcChannel) { |
| rx = true; |
| } |
| } |
| } // for |
| if ((!rx) && (fwd instanceof RpcMessage)) { |
| // if we have a message that requires a response, |
| // but none was given, send back an immediate one |
| sendNoRpcChannelReply((RpcMessage) fwd, source); |
| } |
| if (Logs.MESSAGES.isTraceEnabled()) { |
| Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "] id:" + new UniqueId(msg.getUniqueId())); |
| } |
| |
| } catch (Exception e) { |
| // this could be the channel listener throwing an exception, we should log it |
| // as a warning. |
| if (log.isWarnEnabled()) { |
| log.warn(sm.getString("groupChannel.receiving.error"), e); |
| } |
| throw new RemoteProcessException(sm.getString("groupChannel.receiving.error"), e); |
| } |
| } |
| |
| /** |
| * Sends a <code>NoRpcChannelReply</code> message to a member<br> |
| * This method gets invoked by the channel if an RPC message comes in and no channel listener accepts the message. |
| * This avoids timeout |
| * |
| * @param msg RpcMessage |
| * @param destination Member - the destination for the reply |
| */ |
| protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { |
| try { |
| // avoid circular loop |
| if (msg instanceof RpcMessage.NoRpcChannelReply) { |
| return; |
| } |
| RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId, msg.uuid); |
| send(new Member[] { destination }, reply, SEND_OPTIONS_ASYNCHRONOUS); |
| } catch (Exception e) { |
| log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"), e); |
| } |
| } |
| |
| /** |
| * memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership |
| * listeners |
| * |
| * @param member Member - the new member |
| */ |
| @Override |
| public void memberAdded(Member member) { |
| // notify upwards |
| for (MembershipListener membershipListener : membershipListeners) { |
| if (membershipListener != null) { |
| membershipListener.memberAdded(member); |
| } |
| } |
| } |
| |
| /** |
| * memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the |
| * membership listeners |
| * |
| * @param member Member - the member that left or crashed |
| */ |
| @Override |
| public void memberDisappeared(Member member) { |
| // notify upwards |
| for (MembershipListener membershipListener : membershipListeners) { |
| if (membershipListener != null) { |
| membershipListener.memberDisappeared(member); |
| } |
| } |
| } |
| |
| /** |
| * Sets up the default implementation interceptor stack if no interceptors have been added |
| * |
| * @throws ChannelException Cluster error |
| */ |
| protected synchronized void setupDefaultStack() throws ChannelException { |
| if (getFirstInterceptor() != null && ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { |
| addInterceptor(new MessageDispatchInterceptor()); |
| } |
| Iterator<ChannelInterceptor> interceptors = getInterceptors(); |
| while (interceptors.hasNext()) { |
| ChannelInterceptor channelInterceptor = interceptors.next(); |
| channelInterceptor.setChannel(this); |
| } |
| coordinator.setChannel(this); |
| } |
| |
| /** |
| * Validates the option flags that each interceptor is using and reports an error if two interceptor share the same |
| * flag. |
| * |
| * @throws ChannelException Error with option flag |
| */ |
| protected void checkOptionFlags() throws ChannelException { |
| StringBuilder conflicts = new StringBuilder(); |
| ChannelInterceptor first = interceptors; |
| while (first != null) { |
| int flag = first.getOptionFlag(); |
| if (flag != 0) { |
| ChannelInterceptor next = first.getNext(); |
| while (next != null) { |
| int nflag = next.getOptionFlag(); |
| if (nflag != 0 && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) { |
| conflicts.append('['); |
| conflicts.append(first.getClass().getName()); |
| conflicts.append(':'); |
| conflicts.append(flag); |
| conflicts.append(" == "); |
| conflicts.append(next.getClass().getName()); |
| conflicts.append(':'); |
| conflicts.append(nflag); |
| conflicts.append("] "); |
| } // end if |
| next = next.getNext(); |
| } // while |
| } // end if |
| first = first.getNext(); |
| } // while |
| if (!conflicts.isEmpty()) { |
| throw new ChannelException(sm.getString("groupChannel.optionFlag.conflict", conflicts.toString())); |
| } |
| |
| } |
| |
| protected boolean ownExecutor = false; |
| |
| @Override |
| public synchronized void start(int svc) throws ChannelException { |
| setupDefaultStack(); |
| if (optionCheck) { |
| checkOptionFlags(); |
| } |
| // register jmx |
| JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this); |
| if (jmxRegistry != null) { |
| this.oname = jmxRegistry.registerJmx(",component=Channel", this); |
| } |
| if (utilityExecutor == null) { |
| log.warn(sm.getString("groupChannel.warn.noUtilityExecutor")); |
| utilityExecutor = new ScheduledThreadPoolExecutor(1); |
| ownExecutor = true; |
| } |
| super.start(svc); |
| monitorFuture = utilityExecutor.scheduleWithFixedDelay(this::startHeartbeat, 0, 60, TimeUnit.SECONDS); |
| } |
| |
| protected void startHeartbeat() { |
| if (heartbeat && (heartbeatFuture == null || heartbeatFuture.isDone())) { |
| if (heartbeatFuture != null && heartbeatFuture.isDone()) { |
| // There was an error executing the scheduled task, get it and log it |
| try { |
| heartbeatFuture.get(); |
| } catch (InterruptedException | ExecutionException e) { |
| log.error(sm.getString("groupChannel.unable.sendHeartbeat"), e); |
| } |
| } |
| heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new HeartbeatRunnable(), heartbeatSleeptime, |
| heartbeatSleeptime, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| @Override |
| public synchronized void stop(int svc) throws ChannelException { |
| if (monitorFuture != null) { |
| monitorFuture.cancel(true); |
| monitorFuture = null; |
| } |
| if (heartbeatFuture != null) { |
| heartbeatFuture.cancel(true); |
| heartbeatFuture = null; |
| } |
| super.stop(svc); |
| if (ownExecutor) { |
| utilityExecutor.shutdown(); |
| utilityExecutor = null; |
| ownExecutor = false; |
| } |
| if (oname != null) { |
| JmxRegistry.getRegistry(this).unregisterJmx(oname); |
| oname = null; |
| } |
| } |
| |
| /** |
| * Returns the first interceptor of the stack. Useful for traversal. |
| * |
| * @return ChannelInterceptor |
| */ |
| public ChannelInterceptor getFirstInterceptor() { |
| return Objects.requireNonNullElse(interceptors, coordinator); |
| } |
| |
| @Override |
| public ScheduledExecutorService getUtilityExecutor() { |
| return utilityExecutor; |
| } |
| |
| @Override |
| public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { |
| this.utilityExecutor = utilityExecutor; |
| } |
| |
| @Override |
| public ChannelReceiver getChannelReceiver() { |
| return coordinator.getClusterReceiver(); |
| } |
| |
| @Override |
| public ChannelSender getChannelSender() { |
| return coordinator.getClusterSender(); |
| } |
| |
| @Override |
| public MembershipService getMembershipService() { |
| return coordinator.getMembershipService(); |
| } |
| |
| @Override |
| public void setChannelReceiver(ChannelReceiver clusterReceiver) { |
| coordinator.setClusterReceiver(clusterReceiver); |
| } |
| |
| @Override |
| public void setChannelSender(ChannelSender clusterSender) { |
| coordinator.setClusterSender(clusterSender); |
| } |
| |
| @Override |
| public void setMembershipService(MembershipService membershipService) { |
| coordinator.setMembershipService(membershipService); |
| } |
| |
| @Override |
| public void addMembershipListener(MembershipListener membershipListener) { |
| if (!this.membershipListeners.contains(membershipListener)) { |
| this.membershipListeners.add(membershipListener); |
| } |
| } |
| |
| @Override |
| public void removeMembershipListener(MembershipListener membershipListener) { |
| membershipListeners.remove(membershipListener); |
| } |
| |
| @Override |
| public void addChannelListener(ChannelListener channelListener) { |
| if (!this.channelListeners.contains(channelListener)) { |
| this.channelListeners.add(channelListener); |
| } else { |
| throw new IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist", channelListener, |
| channelListener.getClass().getName())); |
| } |
| } |
| |
| @Override |
| public void removeChannelListener(ChannelListener channelListener) { |
| channelListeners.remove(channelListener); |
| } |
| |
| @Override |
| public Iterator<ChannelInterceptor> getInterceptors() { |
| return new InterceptorIterator(this.getNext(), this.coordinator); |
| } |
| |
| /** |
| * Enables/disables the option check<br> |
| * Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two |
| * interceptors are using the same option flag and throw an error upon start. |
| * |
| * @param optionCheck boolean |
| */ |
| public void setOptionCheck(boolean optionCheck) { |
| this.optionCheck = optionCheck; |
| } |
| |
| /** |
| * Configure local heartbeat sleep time<br> |
| * Only used when <code>getHeartbeat()==true</code> |
| * |
| * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats |
| */ |
| public void setHeartbeatSleeptime(long heartbeatSleeptime) { |
| this.heartbeatSleeptime = heartbeatSleeptime; |
| } |
| |
| /** |
| * Enables or disables local heartbeat. if <code>setHeartbeat(true)</code> is invoked then the channel will start an |
| * internal thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds |
| * |
| * @param heartbeat boolean |
| */ |
| @Override |
| public void setHeartbeat(boolean heartbeat) { |
| this.heartbeat = heartbeat; |
| } |
| |
| @Override |
| public boolean getOptionCheck() { |
| return optionCheck; |
| } |
| |
| @Override |
| public boolean getHeartbeat() { |
| return heartbeat; |
| } |
| |
| /** |
| * @return the sleep time in milliseconds that the internal heartbeat will sleep in between invocations of |
| * <code>Channel.heartbeat()</code> |
| */ |
| @Override |
| public long getHeartbeatSleeptime() { |
| return heartbeatSleeptime; |
| } |
| |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public void setName(String name) { |
| this.name = name; |
| } |
| |
| @Override |
| public boolean isJmxEnabled() { |
| return jmxEnabled; |
| } |
| |
| @Override |
| public void setJmxEnabled(boolean jmxEnabled) { |
| this.jmxEnabled = jmxEnabled; |
| } |
| |
| @Override |
| public String getJmxDomain() { |
| return jmxDomain; |
| } |
| |
| @Override |
| public void setJmxDomain(String jmxDomain) { |
| this.jmxDomain = jmxDomain; |
| } |
| |
| @Override |
| public String getJmxPrefix() { |
| return jmxPrefix; |
| } |
| |
| @Override |
| public void setJmxPrefix(String jmxPrefix) { |
| this.jmxPrefix = jmxPrefix; |
| } |
| |
| @Override |
| public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { |
| // NOOP |
| return null; |
| } |
| |
| @Override |
| public void postRegister(Boolean registrationDone) { |
| // NOOP |
| } |
| |
| @Override |
| public void preDeregister() throws Exception { |
| // NOOP |
| } |
| |
| @Override |
| public void postDeregister() { |
| JmxRegistry.removeRegistry(this, true); |
| } |
| |
| /** |
| * An iterator to loop through the interceptors in a channel. |
| */ |
| public static class InterceptorIterator implements Iterator<ChannelInterceptor> { |
| private final ChannelInterceptor end; |
| private ChannelInterceptor start; |
| |
| public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) { |
| this.end = end; |
| this.start = start; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return start != null && start != end; |
| } |
| |
| @Override |
| public ChannelInterceptor next() { |
| ChannelInterceptor result = null; |
| if (hasNext()) { |
| result = start; |
| start = start.getNext(); |
| } |
| return result; |
| } |
| |
| @Override |
| public void remove() { |
| // empty operation |
| } |
| } |
| |
| /** |
| * <p> |
| * Title: Internal heartbeat runnable |
| * </p> |
| * <p> |
| * Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class is created |
| * </p> |
| */ |
| public class HeartbeatRunnable implements Runnable { |
| @Override |
| public void run() { |
| heartbeat(); |
| } |
| } |
| |
| } |