| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal.membership.gms.messenger; |
| |
| import static org.apache.geode.distributed.internal.membership.gms.GMSUtil.replaceStrings; |
| import static org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage.ALL_RECIPIENTS; |
| import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_REQ; |
| import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_RESP; |
| import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_REQUEST; |
| import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_RESPONSE; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Collectors; |
| |
| import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; |
| import org.apache.logging.log4j.Logger; |
| import org.jgroups.Address; |
| import org.jgroups.Channel; |
| import org.jgroups.Event; |
| import org.jgroups.JChannel; |
| import org.jgroups.Message; |
| import org.jgroups.Message.Flag; |
| import org.jgroups.Message.TransientFlag; |
| import org.jgroups.Receiver; |
| import org.jgroups.ReceiverAdapter; |
| import org.jgroups.View; |
| import org.jgroups.ViewId; |
| import org.jgroups.conf.ClassConfigurator; |
| import org.jgroups.protocols.UDP; |
| import org.jgroups.protocols.pbcast.NAKACK2; |
| import org.jgroups.protocols.pbcast.NakAckHeader2; |
| import org.jgroups.stack.IpAddress; |
| import org.jgroups.util.Digest; |
| import org.jgroups.util.UUID; |
| |
| import org.apache.geode.ForcedDisconnectException; |
| import org.apache.geode.GemFireConfigException; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.SystemConnectException; |
| import org.apache.geode.alerting.internal.spi.AlertingAction; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.membership.gms.GMSMemberData; |
| import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; |
| import org.apache.geode.distributed.internal.membership.gms.GMSUtil; |
| import org.apache.geode.distributed.internal.membership.gms.Services; |
| import org.apache.geode.distributed.internal.membership.gms.api.MemberData; |
| import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier; |
| import org.apache.geode.distributed.internal.membership.gms.api.MembershipConfig; |
| import org.apache.geode.distributed.internal.membership.gms.api.MembershipStatistics; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.GMSMessage; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler; |
| import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger; |
| import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest; |
| import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse; |
| import org.apache.geode.distributed.internal.membership.gms.messages.JoinRequestMessage; |
| import org.apache.geode.distributed.internal.membership.gms.messages.JoinResponseMessage; |
| import org.apache.geode.internal.ClassPathLoader; |
| import org.apache.geode.internal.OSProcess; |
| import org.apache.geode.internal.cache.DistributedCacheOperation; |
| import org.apache.geode.internal.net.SocketCreator; |
| import org.apache.geode.internal.serialization.BufferDataOutputStream; |
| import org.apache.geode.internal.serialization.StaticSerialization; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.serialization.VersionedDataInputStream; |
| import org.apache.geode.internal.tcp.MemberShunnedException; |
| |
| @SuppressWarnings("StatementWithEmptyBody") |
| public class JGroupsMessenger implements Messenger { |
| |
| private static final Logger logger = Services.getLogger(); |
| |
| /** |
| * The location (in the product) of the non-mcast Jgroups config file. |
| */ |
| private static final String DEFAULT_JGROUPS_TCP_CONFIG = |
| "org/apache/geode/distributed/internal/membership/gms/messenger/jgroups-config.xml"; |
| |
| /** |
| * The location (in the product) of the mcast Jgroups config file. |
| */ |
| private static final String JGROUPS_MCAST_CONFIG_FILE_NAME = |
| "org/apache/geode/distributed/internal/membership/gms/messenger/jgroups-mcast.xml"; |
| |
| /** JG magic numbers for types added to the JG ClassConfigurator */ |
| private static final short JGROUPS_TYPE_JGADDRESS = 2000; |
| private static final short JGROUPS_PROTOCOL_TRANSPORT = 1000; |
| |
| @MutableForTesting |
| public static boolean THROW_EXCEPTION_ON_START_HOOK; |
| |
| protected String jgStackConfig; |
| |
| JChannel myChannel; |
| MemberIdentifier localAddress; |
| JGAddress jgAddress; |
| private Services services; |
| |
| /** handlers that receive certain classes of messages instead of the Manager */ |
| private final Map<Class, MessageHandler> handlers = new ConcurrentHashMap<>(); |
| |
| private volatile GMSMembershipView view; |
| |
| protected final GMSPingPonger pingPonger = new GMSPingPonger(); |
| |
| protected final AtomicLong pongsReceived = new AtomicLong(0); |
| |
| /** tracks multicast messages that have been scheduled for processing */ |
| protected final Map<MemberIdentifier, MessageTracker> scheduledMcastSeqnos = new HashMap<>(); |
| |
| protected short nackack2HeaderId; |
| |
| /** |
| * A set that contains addresses that we have logged JGroups IOExceptions for in the current |
| * membership view and possibly initiated suspect processing. This reduces the amount of suspect |
| * processing initiated by IOExceptions and the amount of exceptions logged |
| */ |
| private final Set<Address> addressesWithIoExceptionsProcessed = |
| Collections.synchronizedSet(new HashSet<Address>()); |
| |
| static { |
| // register classes that we've added to jgroups that are put on the wire |
| // or need a header ID |
| ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class); |
| ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class); |
| } |
| |
| private GMSEncrypt encrypt; |
| |
| /** |
| * Member identifiers already used, either in this JGroupsMessenger instance |
| * or in a past one & retained through an auto-reconnect. |
| */ |
| private Set<MemberIdentifier> usedMemberIdentifiers = new HashSet<>(); |
| |
| /** |
| * During reconnect a QuorumChecker holds the JGroups channel and responds to Ping |
| * and Pong messages but also queues any messages it doesn't recognize. These need |
| * to be delivered to handlers after membership services have been rebuilt. |
| */ |
| private Queue<Message> queuedMessagesFromReconnect; |
| |
| /** |
| * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible |
| * for deserializating and dispatching those messages to the appropriate handler |
| */ |
| private JGroupsReceiver jgroupsReceiver; |
| |
| public static void setChannelReceiver(JChannel channel, Receiver r) { |
| try { |
| // Channel.setReceiver() will issue a warning if we try to set a new receiver |
| // and the channel already has one. Rather than set the receiver to null & |
| // then establish a new one we use reflection to set the channel receiver. See GEODE-7220 |
| Field receiver = Channel.class.getDeclaredField("receiver"); |
| receiver.setAccessible(true); |
| receiver.set(channel, r); |
| } catch (NoSuchFieldException | IllegalAccessException e) { |
| throw new InternalGemFireException("unable to establish a JGroups receiver", e); |
| } |
| } |
| |
| @Override |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings( |
| value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") |
| public void init(Services s) { |
| this.services = s; |
| |
| MembershipConfig config = services.getConfig(); |
| |
| |
| boolean enableNetworkPartitionDetection = config.getEnableNetworkPartitionDetection(); |
| System.setProperty("jgroups.resolve_dns", String.valueOf(!enableNetworkPartitionDetection)); |
| |
| InputStream is; |
| |
| String r; |
| if (config.isMcastEnabled()) { |
| r = JGROUPS_MCAST_CONFIG_FILE_NAME; |
| } else { |
| r = DEFAULT_JGROUPS_TCP_CONFIG; |
| } |
| is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r); |
| if (is == null) { |
| throw new GemFireConfigException( |
| String.format("Cannot find %s", r)); |
| } |
| |
| String properties; |
| try { |
| StringBuilder sb = new StringBuilder(3000); |
| BufferedReader br; |
| br = new BufferedReader(new InputStreamReader(is, "US-ASCII")); |
| String input; |
| while ((input = br.readLine()) != null) { |
| sb.append(input); |
| } |
| br.close(); |
| properties = sb.toString(); |
| } catch (Exception ex) { |
| throw new GemFireConfigException( |
| "An Exception was thrown while reading JGroups config.", |
| ex); |
| } |
| |
| if (properties.startsWith("<!--")) { |
| int commentEnd = properties.indexOf("-->"); |
| properties = properties.substring(commentEnd + 3); |
| } |
| |
| |
| if (config.isMcastEnabled()) { |
| properties = replaceStrings(properties, "MCAST_PORT", |
| String.valueOf(config.getMcastPort())); |
| properties = |
| replaceStrings(properties, "MCAST_ADDRESS", config.getMcastAddress().getHostAddress()); |
| properties = replaceStrings(properties, "MCAST_TTL", String.valueOf(config.getMcastTtl())); |
| properties = replaceStrings(properties, "MCAST_SEND_BUFFER_SIZE", |
| String.valueOf(config.getMcastSendBufferSize())); |
| properties = replaceStrings(properties, "MCAST_RECV_BUFFER_SIZE", |
| String.valueOf(config.getMcastRecvBufferSize())); |
| properties = replaceStrings(properties, "MCAST_RETRANSMIT_INTERVAL", "" + Integer |
| .getInteger(DistributionConfig.GEMFIRE_PREFIX + "mcast-retransmit-interval", 500)); |
| properties = replaceStrings(properties, "RETRANSMIT_LIMIT", |
| String.valueOf(config.getUdpFragmentSize() - 256)); |
| } |
| |
| if (config.isMcastEnabled() || config.isTcpDisabled() |
| || (config.getUdpRecvBufferSize() != DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE)) { |
| properties = |
| replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", "" + config.getUdpRecvBufferSize()); |
| } else { |
| properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", |
| "" + DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED); |
| } |
| properties = |
| replaceStrings(properties, "UDP_SEND_BUFFER_SIZE", "" + config.getUdpSendBufferSize()); |
| |
| String str = config.getBindAddress(); |
| // JGroups UDP protocol requires a bind address |
| if (str == null || str.length() == 0) { |
| try { |
| str = SocketCreator.getLocalHost().getHostAddress(); |
| } catch (UnknownHostException e) { |
| throw new GemFireConfigException(e.getMessage(), e); |
| } |
| } |
| properties = replaceStrings(properties, "BIND_ADDR_SETTING", "bind_addr=\"" + str + "\""); |
| |
| int port = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "jg-bind-port", 0); |
| if (port != 0) { |
| properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", "" + port); |
| properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE", "" + 0); |
| } else { |
| int[] ports = config.getMembershipPortRange(); |
| properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", "" + ports[0]); |
| properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE", "" + (ports[1] - ports[0])); |
| } |
| |
| properties = replaceStrings(properties, "UDP_FRAGMENT_SIZE", "" + config.getUdpFragmentSize()); |
| |
| properties = replaceStrings(properties, "FC_MAX_CREDITS", |
| "" + config.getMcastByteAllowance()); |
| properties = replaceStrings(properties, "FC_THRESHOLD", |
| "" + config.getMcastRechargeThreshold()); |
| properties = replaceStrings(properties, "FC_MAX_BLOCK", |
| "" + config.getMcastRechargeBlockMs()); |
| |
| this.jgStackConfig = properties; |
| |
| if (!config.getSecurityUDPDHAlgo().isEmpty()) { |
| try { |
| this.encrypt = new GMSEncrypt(services, config.getSecurityUDPDHAlgo()); |
| logger.info("Initializing GMSEncrypt "); |
| } catch (Exception e) { |
| throw new GemFireConfigException("problem initializing encryption protocol", e); |
| } |
| } |
| } |
| |
| @Override |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings( |
| value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") |
| public void start() { |
| // create the configuration XML string for JGroups |
| String properties = this.jgStackConfig; |
| |
| long start = System.currentTimeMillis(); |
| |
| // start the jgroups channel and establish the membership ID |
| boolean reconnecting = false; |
| try { |
| Object oldDSMembershipInfo = services.getConfig().getOldDSMembershipInfo(); |
| if (oldDSMembershipInfo != null) { |
| logger.debug("Reusing JGroups channel from previous system", properties); |
| MembershipInformation oldInfo = (MembershipInformation) oldDSMembershipInfo; |
| myChannel = oldInfo.getChannel(); |
| usedMemberIdentifiers = oldInfo.getMembershipIdentifiers(); |
| queuedMessagesFromReconnect = oldInfo.getQueuedMessages(); |
| |
| // scrub the old channel |
| ViewId vid = new ViewId(new JGAddress(), 0); |
| List<Address> members = new ArrayList<>(); |
| members.add(new UUID(0, 0));// TODO open a JGroups JIRA for GEODE-3034 |
| View jgv = new View(vid, members); |
| this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); |
| // attempt to establish a new UUID in the jgroups channel so the member address will be |
| // different |
| try { |
| Method setAddressMethod = JChannel.class.getDeclaredMethod("setAddress"); |
| setAddressMethod.setAccessible(true); |
| setAddressMethod.invoke(myChannel); |
| } catch (SecurityException | NoSuchMethodException e) { |
| logger.warn("Unable to establish a new JGroups address. " |
| + "My address will be exactly the same as last time. Exception={}", |
| e.getMessage()); |
| } |
| reconnecting = true; |
| } else { |
| logger.debug("JGroups configuration: {}", properties); |
| |
| checkForIPv6(); |
| InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8")); |
| myChannel = new JChannel(is); |
| } |
| } catch (Exception e) { |
| throw new GemFireConfigException("unable to create jgroups channel", e); |
| } |
| |
| // give the stats to the jchannel statistics recorder |
| StatRecorder sr = (StatRecorder) myChannel.getProtocolStack().findProtocol(StatRecorder.class); |
| if (sr != null) { |
| sr.setServices(services); |
| } |
| |
| Transport transport = (Transport) myChannel.getProtocolStack().getTransport(); |
| transport.setMessenger(this); |
| |
| nackack2HeaderId = ClassConfigurator.getProtocolId(NAKACK2.class); |
| |
| try { |
| jgroupsReceiver = new JGroupsReceiver(); |
| setChannelReceiver(myChannel, jgroupsReceiver); |
| if (!reconnecting) { |
| myChannel.connect("AG"); // Apache Geode |
| } |
| } catch (Exception e) { |
| myChannel.close(); |
| throw new SystemConnectException("unable to create jgroups channel", e); |
| } |
| |
| if (JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK) { |
| JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK = false; |
| throw new SystemConnectException("failing for test"); |
| } |
| |
| establishLocalAddress(); |
| |
| logger.info("JGroups channel {} (took {}ms)", (reconnecting ? "reinitialized" : "created"), |
| System.currentTimeMillis() - start); |
| |
| } |
| |
| @Override |
| public boolean isOldMembershipIdentifier(MemberIdentifier id) { |
| return usedMemberIdentifiers.contains(id); |
| } |
| |
| /** |
| * JGroups picks an IPv6 address if preferIPv4Stack is false or not set and preferIPv6Addresses is |
| * not set or is true. We want it to use an IPv4 address for a dual-IP stack so that both IPv4 and |
| * IPv6 messaging work |
| */ |
| private void checkForIPv6() throws Exception { |
| boolean preferIpV6Addr = Boolean.getBoolean("java.net.preferIPv6Addresses"); |
| if (!preferIpV6Addr) { |
| logger.debug("forcing JGroups to think IPv4 is being used so it will choose an IPv4 address"); |
| Field m = org.jgroups.util.Util.class.getDeclaredField("ip_stack_type"); |
| m.setAccessible(true); |
| m.set(null, org.jgroups.util.StackType.IPv4); |
| } |
| } |
| |
| @Override |
| public void started() { |
| if (queuedMessagesFromReconnect != null && !services.getConfig().isUDPSecurityEnabled()) { |
| logger.info("Delivering {} messages queued by quorum checker", |
| queuedMessagesFromReconnect.size()); |
| for (Message message : queuedMessagesFromReconnect) { |
| jgroupsReceiver.receive(message, true); |
| } |
| queuedMessagesFromReconnect.clear(); |
| queuedMessagesFromReconnect = null; |
| } |
| } |
| |
| @Override |
| public void stop() { |
| if (localAddress != null && localAddress.getVmViewId() >= 0) { |
| // keep track of old addresses that were used to successfully join the cluster |
| usedMemberIdentifiers.add(localAddress); |
| } |
| if (this.myChannel != null) { |
| if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) |
| || services.getManager().isReconnectingDS()) { |
| // leave the channel open for reconnect attempts |
| } else { |
| this.myChannel.close(); |
| } |
| } |
| } |
| |
| @Override |
| public void stopped() {} |
| |
| @Override |
| public void memberSuspected(MemberIdentifier initiator, |
| MemberIdentifier suspect, String reason) {} |
| |
| @Override |
| public void installView(GMSMembershipView v) { |
| this.view = v; |
| |
| if (this.jgAddress.getVmViewId() < 0) { |
| this.jgAddress.setVmViewId(this.localAddress.getVmViewId()); |
| } |
| List<JGAddress> mbrs = v.getMembers().stream().map(JGAddress::new).collect(Collectors.toList()); |
| ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId()); |
| View jgv = new View(vid, new ArrayList<>(mbrs)); |
| logger.trace("installing view into JGroups stack: {}", jgv); |
| this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); |
| |
| addressesWithIoExceptionsProcessed.clear(); |
| if (encrypt != null) { |
| encrypt.installView(v); |
| } |
| synchronized (scheduledMcastSeqnos) { |
| for (MemberIdentifier mbr : v.getCrashedMembers()) { |
| scheduledMcastSeqnos.remove(mbr); |
| } |
| for (MemberIdentifier mbr : v.getShutdownMembers()) { |
| scheduledMcastSeqnos.remove(mbr); |
| } |
| } |
| } |
| |
| |
| /** |
| * If JGroups is unable to send a message it may mean that the network is down. If so we need to |
| * initiate suspect processing on the recipient. |
| * <p> |
| * see Transport._send() |
| */ |
| @SuppressWarnings("UnusedParameters") |
| public void handleJGroupsIOException(IOException e, Address dest) { |
| if (services.getManager().shutdownInProgress()) { // GEODE-634 - don't log IOExceptions during |
| // shutdown |
| return; |
| } |
| GMSMembershipView v = this.view; |
| JGAddress jgMbr = (JGAddress) dest; |
| if (jgMbr != null && v != null) { |
| List<MemberIdentifier> members = v.getMembers(); |
| MemberIdentifier recipient = null; |
| for (MemberIdentifier gmsMbr : members) { |
| MemberData memberData = gmsMbr.getMemberData(); |
| if (jgMbr.getUUIDLsbs() == memberData.getUuidLeastSignificantBits() |
| && jgMbr.getUUIDMsbs() == memberData.getUuidMostSignificantBits() |
| && jgMbr.getVmViewId() == memberData.getVmViewId()) { |
| recipient = gmsMbr; |
| break; |
| } |
| } |
| if (recipient != null) { |
| if (!addressesWithIoExceptionsProcessed.contains(dest)) { |
| logger.warn("Unable to send message to " + recipient, e); |
| addressesWithIoExceptionsProcessed.add(dest); |
| } |
| // If communications aren't working we need to resolve the issue quickly, so here |
| // we initiate a final check. Prior to becoming open-source we did a similar check |
| // using JGroups VERIFY_SUSPECT |
| services.getHealthMonitor().checkIfAvailable(recipient, |
| "Unable to send messages to this member via JGroups", true); |
| } |
| } |
| } |
| |
| private void establishLocalAddress() { |
| UUID logicalAddress = (UUID) myChannel.getAddress(); |
| logicalAddress = logicalAddress.copy(); |
| |
| IpAddress ipaddr = (IpAddress) myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS)); |
| |
| if (ipaddr != null) { |
| this.jgAddress = new JGAddress(logicalAddress, ipaddr); |
| } else { |
| UDP udp = (UDP) myChannel.getProtocolStack().getTransport(); |
| |
| try { |
| Method getAddress = UDP.class.getDeclaredMethod("getPhysicalAddress"); |
| getAddress.setAccessible(true); |
| ipaddr = (IpAddress) getAddress.invoke(udp, new Object[0]); |
| this.jgAddress = new JGAddress(logicalAddress, ipaddr); |
| } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { |
| throw new InternalGemFireError( |
| "Unable to configure JGroups channel for membership communications", e); |
| } |
| } |
| |
| // install the address in the JGroups channel protocols |
| myChannel.down(new Event(Event.SET_LOCAL_ADDRESS, this.jgAddress)); |
| |
| MembershipConfig config = services.getConfig(); |
| boolean isLocator = (config |
| .getVmKind() == MemberIdentifier.LOCATOR_DM_TYPE) |
| || !config.getStartLocator().isEmpty(); |
| |
| // establish the DistributedSystem's address |
| String hostname = |
| SocketCreator.resolve_dns ? SocketCreator.getHostName(jgAddress.getInetAddress()) |
| : jgAddress.getInetAddress().getHostAddress(); |
| GMSMemberData gmsMember = new GMSMemberData(jgAddress.getInetAddress(), |
| hostname, jgAddress.getPort(), |
| OSProcess.getId(), (byte) services.getConfig().getVmKind(), |
| -1 /* directport */, -1 /* viewID */, config.getName(), |
| GMSUtil.parseGroups(config.getRoles(), config.getGroups()), config.getDurableClientId(), |
| config.getDurableClientTimeout(), |
| config.getEnableNetworkPartitionDetection(), isLocator, |
| Version.getCurrentVersion().ordinal(), |
| jgAddress.getUUIDMsbs(), jgAddress.getUUIDLsbs(), |
| (byte) (services.getConfig().getMemberWeight() & 0xff)); |
| localAddress = services.getMemberFactory().create(gmsMember); |
| logger.info("Established local address {}", localAddress); |
| services.setLocalAddress(localAddress); |
| } |
| |
| @Override |
| public void beSick() {} |
| |
| @Override |
| public void playDead() {} |
| |
| @Override |
| public void beHealthy() {} |
| |
| @Override |
| public <T> void addHandler(Class<T> c, MessageHandler<T> h) { |
| handlers.put(c, h); |
| } |
| |
| @Override |
| public boolean testMulticast(long timeout) throws InterruptedException { |
| long pongsSnapshot = pongsReceived.longValue(); |
| JGAddress dest = null; |
| try { |
| // noinspection ConstantConditions |
| pingPonger.sendPingMessage(myChannel, jgAddress, dest); |
| } catch (Exception e) { |
| logger.warn("unable to send multicast message: {}", |
| (jgAddress == null ? "multicast recipients" : jgAddress), e.getMessage()); |
| return false; |
| } |
| long giveupTime = System.currentTimeMillis() + timeout; |
| while (pongsReceived.longValue() == pongsSnapshot && System.currentTimeMillis() < giveupTime) { |
| Thread.sleep(100); |
| } |
| return pongsReceived.longValue() > pongsSnapshot; |
| } |
| |
| @Override |
| public void getMessageState(MemberIdentifier target, Map<String, Long> state, |
| boolean includeMulticast) { |
| if (includeMulticast) { |
| NAKACK2 nakack = (NAKACK2) myChannel.getProtocolStack().findProtocol("NAKACK2"); |
| if (nakack != null) { |
| long seqno = nakack.getCurrentSeqno(); |
| state.put("JGroups.mcastState", seqno); |
| } |
| } |
| } |
| |
| @Override |
| public void waitForMessageState(MemberIdentifier sender, Map<String, Long> state) |
| throws InterruptedException { |
| Long seqno = state.get("JGroups.mcastState"); |
| if (seqno == null) { |
| return; |
| } |
| long timeout = services.getConfig().getAckWaitThreshold() * 1000L; |
| long startTime = System.currentTimeMillis(); |
| long warnTime = startTime + timeout; |
| long quitTime = warnTime + timeout - 1000L; |
| boolean warned = false; |
| |
| for (;;) { |
| String received = "none"; |
| long highSeqno = 0; |
| synchronized (scheduledMcastSeqnos) { |
| MessageTracker tracker = scheduledMcastSeqnos.get(sender); |
| if (tracker == null) { // no longer in the membership view |
| break; |
| } |
| highSeqno = tracker.get(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "waiting for multicast messages from {}. Current seqno={} and expected seqno={}", |
| sender, highSeqno, seqno); |
| } |
| if (highSeqno >= seqno) { |
| break; |
| } |
| long now = System.currentTimeMillis(); |
| if (!warned && now >= warnTime) { |
| warned = true; |
| received = String.valueOf(highSeqno); |
| logger.warn( |
| "{} seconds have elapsed while waiting for multicast messages from {}. Received {} but expecting at least {}.", |
| Long.toString((warnTime - startTime) / 1000L), sender, received, seqno); |
| } |
| if (now >= quitTime) { |
| throw new GemFireIOException("Multicast operations from " + sender |
| + " did not distribute within " + (now - startTime) + " milliseconds"); |
| } |
| Thread.sleep(50); |
| } |
| } |
| |
| @Override |
| public Set<MemberIdentifier> sendUnreliably(GMSMessage msg) { |
| return send(msg, false); |
| } |
| |
| @Override |
| public Set<MemberIdentifier> send(GMSMessage msg) { |
| return send(msg, true); |
| } |
| |
| private Set<MemberIdentifier> send(GMSMessage msg, boolean reliably) { |
| |
| // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method |
| |
| // BUT: when marshalling messages we need to include the version of the product and |
| // localAddress at the beginning of the message. These should be used in the receiver |
| // code to create a versioned input stream, read the sender address, then read the message |
| // and set its sender address |
| MembershipStatistics theStats = services.getStatistics(); |
| GMSMembershipView oldView = this.view; |
| |
| if (!myChannel.isConnected()) { |
| logger.info("JGroupsMessenger channel is closed - messaging is not possible"); |
| throw new DistributedSystemDisconnectedException("Distributed System is shutting down"); |
| } |
| |
| filterOutgoingMessage(msg); |
| |
| List<MemberIdentifier> destinations = msg.getRecipients(); |
| boolean allDestinations = msg.forAll(); |
| |
| boolean useMcast = false; |
| if (services.getConfig().isMcastEnabled()) { |
| if (msg.getMulticast() || allDestinations) { |
| useMcast = services.getManager().isMulticastAllowed(); |
| } |
| } |
| |
| if (logger.isDebugEnabled() && reliably) { |
| String recips = useMcast ? "multicast" : destinations.toString(); |
| logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips); |
| } |
| |
| JGAddress local = this.jgAddress; |
| |
| if (useMcast) { |
| |
| long startSer = theStats.startMsgSerialization(); |
| Message jmsg = |
| createJGMessage(msg, local, null, Version.getCurrentVersion().ordinal()); |
| theStats.endMsgSerialization(startSer); |
| |
| Exception problem; |
| try { |
| jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK); |
| if (!reliably) { |
| jmsg.setFlag(Message.Flag.NO_RELIABILITY); |
| } |
| theStats.incSentBytes(jmsg.getLength()); |
| logger.trace("Sending JGroups message: {}", jmsg); |
| myChannel.send(jmsg); |
| } catch (Exception e) { |
| logger.debug("caught unexpected exception", e); |
| Throwable cause = e.getCause(); |
| if (cause instanceof ForcedDisconnectException) { |
| problem = (Exception) cause; |
| } else { |
| problem = e; |
| } |
| if (services.getShutdownCause() != null) { |
| Throwable shutdownCause = services.getShutdownCause(); |
| // If ForcedDisconnectException occurred then report it as actual |
| // problem. |
| if (shutdownCause instanceof ForcedDisconnectException) { |
| problem = (Exception) shutdownCause; |
| } else { |
| Throwable ne = problem; |
| while (ne.getCause() != null) { |
| ne = ne.getCause(); |
| } |
| ne.initCause(services.getShutdownCause()); |
| } |
| } |
| final String channelClosed = |
| "Channel closed"; |
| throw new DistributedSystemDisconnectedException(channelClosed, problem); |
| } |
| } // useMcast |
| else { // ! useMcast |
| int len = destinations.size(); |
| List<MemberIdentifier> calculatedMembers; // explicit list of members |
| int calculatedLen; // == calculatedMembers.len |
| if (len == 1 && destinations.get(0) == ALL_RECIPIENTS) { // send to all |
| // Grab a copy of the current membership |
| GMSMembershipView v = services.getJoinLeave().getView(); |
| |
| // Construct the list |
| calculatedLen = v.size(); |
| calculatedMembers = new LinkedList<MemberIdentifier>(); |
| for (int i = 0; i < calculatedLen; i++) { |
| MemberIdentifier m = (MemberIdentifier) v.get(i); |
| calculatedMembers.add((MemberIdentifier) m); |
| } |
| } // send to all |
| else { // send to explicit list |
| calculatedLen = len; |
| calculatedMembers = new LinkedList<MemberIdentifier>(); |
| for (int i = 0; i < calculatedLen; i++) { |
| calculatedMembers.add((MemberIdentifier) destinations.get(i)); |
| } |
| } // send to explicit list |
| Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>(); |
| long startSer = theStats.startMsgSerialization(); |
| boolean firstMessage = true; |
| for (MemberIdentifier mbr : calculatedMembers) { |
| short version = mbr.getVersionOrdinal(); |
| if (!messages.containsKey(version)) { |
| Message jmsg = createJGMessage(msg, local, mbr, version); |
| messages.put(version, jmsg); |
| if (firstMessage) { |
| theStats.incSentBytes(jmsg.getLength()); |
| firstMessage = false; |
| } |
| } |
| } |
| theStats.endMsgSerialization(startSer); |
| Collections.shuffle(calculatedMembers); |
| int i = 0; |
| for (MemberIdentifier mbr : calculatedMembers) { |
| JGAddress to = new JGAddress(mbr); |
| short version = mbr.getVersionOrdinal(); |
| Message jmsg = messages.get(version); |
| Exception problem = null; |
| try { |
| Message tmp = (i < (calculatedLen - 1)) ? jmsg.copy(true) : jmsg; |
| if (!reliably) { |
| jmsg.setFlag(Message.Flag.NO_RELIABILITY); |
| } |
| tmp.setDest(to); |
| tmp.setSrc(this.jgAddress); |
| logger.trace("Unicasting to {}", to); |
| myChannel.send(tmp); |
| } catch (Exception e) { |
| problem = e; |
| } |
| if (problem != null) { |
| Throwable cause = services.getShutdownCause(); |
| if (cause != null) { |
| // If ForcedDisconnectException occurred then report it as actual |
| // problem. |
| if (cause instanceof ForcedDisconnectException) { |
| problem = (Exception) cause; |
| } else { |
| Throwable ne = problem; |
| while (ne.getCause() != null) { |
| ne = ne.getCause(); |
| } |
| ne.initCause(cause); |
| } |
| } |
| final String channelClosed = |
| "Channel closed"; |
| throw new DistributedSystemDisconnectedException(channelClosed, problem); |
| } |
| } // send individually |
| } // !useMcast |
| |
| // The contract is that every destination enumerated in the |
| // message should have received the message. If one left |
| // (i.e., left the view), we signal it here. |
| if (msg.forAll()) { |
| return Collections.emptySet(); |
| } |
| Set<MemberIdentifier> result = new HashSet<>(); |
| GMSMembershipView newView = this.view; |
| if (newView != null && newView != oldView) { |
| for (MemberIdentifier d : destinations) { |
| if (!newView.contains(d)) { |
| logger.debug("messenger: member has left the view: {} view is now {}", d, newView); |
| result.add(d); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * This is the constructor to use to create a JGroups message holding a GemFire |
| * DistributionMessage. It sets the appropriate flags in the Message and properly serializes the |
| * DistributionMessage for the recipient's product version |
| * |
| * @param gfmsg the DistributionMessage |
| * @param src the sender address |
| * @param version the version of the recipient |
| * @return the new message |
| */ |
| Message createJGMessage(GMSMessage gfmsg, JGAddress src, MemberIdentifier dst, short version) { |
| gfmsg.registerProcessor(); |
| Message msg = new Message(); |
| msg.setDest(null); |
| msg.setSrc(src); |
| setMessageFlags(gfmsg, msg); |
| try { |
| long start = services.getStatistics().startMsgSerialization(); |
| BufferDataOutputStream out_stream = |
| new BufferDataOutputStream(Version.fromOrdinalNoThrow((short) version, false)); |
| Version.writeOrdinal(out_stream, |
| Version.getCurrentVersion().ordinal(), true); |
| if (encrypt != null) { |
| out_stream.writeBoolean(true); |
| writeEncryptedMessage(gfmsg, dst, version, out_stream); |
| } else { |
| out_stream.writeBoolean(false); |
| serializeMessage(gfmsg, out_stream); |
| } |
| |
| msg.setBuffer(out_stream.toByteArray()); |
| services.getStatistics().endMsgSerialization(start); |
| } catch (IOException | GemFireIOException ex) { |
| logger.warn("Error serializing message", ex); |
| if (ex instanceof GemFireIOException) { |
| throw (GemFireIOException) ex; |
| } else { |
| GemFireIOException ioe = new GemFireIOException("Error serializing message"); |
| ioe.initCause(ex); |
| throw ioe; |
| } |
| } catch (Exception ex) { |
| logger.warn("Error serializing message", ex); |
| GemFireIOException ioe = new GemFireIOException("Error serializing message"); |
| ioe.initCause(ex.getCause()); |
| throw ioe; |
| } |
| return msg; |
| } |
| |
| void writeEncryptedMessage(GMSMessage gfmsg, MemberIdentifier recipient, short version, |
| BufferDataOutputStream out) |
| throws Exception { |
| long start = services.getStatistics().startUDPMsgEncryption(); |
| try { |
| services.getSerializer().writeDSFIDHeader(gfmsg.getDSFID(), out); |
| byte[] pk = null; |
| int requestId = 0; |
| MemberIdentifier pkMbr = null; |
| switch (gfmsg.getDSFID()) { |
| case FIND_COORDINATOR_REQ: |
| case JOIN_REQUEST: |
| // need to append mine PK |
| pk = encrypt.getPublicKey(localAddress); |
| pkMbr = recipient; |
| requestId = getRequestId(gfmsg, pkMbr, true); |
| break; |
| case FIND_COORDINATOR_RESP: |
| case JOIN_RESPONSE: |
| pkMbr = recipient; |
| requestId = getRequestId(gfmsg, pkMbr, false); |
| default: |
| break; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {} for {} with requestid {}", |
| gfmsg.getDSFID(), pkMbr, requestId); |
| } |
| out.writeInt(requestId); |
| if (pk != null) { |
| StaticSerialization.writeByteArray(pk, out); |
| } |
| |
| BufferDataOutputStream out_stream = |
| new BufferDataOutputStream(Version.fromOrdinalNoThrow((short) version, false)); |
| byte[] messageBytes = serializeMessage(gfmsg, out_stream); |
| |
| if (pkMbr != null) { |
| // using members private key |
| messageBytes = encrypt.encryptData(messageBytes, pkMbr); |
| } else { |
| // using cluster secret key |
| messageBytes = encrypt.encryptData(messageBytes); |
| } |
| StaticSerialization.writeByteArray(messageBytes, out); |
| } finally { |
| services.getStatistics().endUDPMsgEncryption(start); |
| } |
| } |
| |
| int getRequestId(GMSMessage gfmsg, MemberIdentifier destination, boolean add) { |
| int requestId = 0; |
| if (gfmsg instanceof FindCoordinatorRequest) { |
| requestId = ((FindCoordinatorRequest) gfmsg).getRequestId(); |
| } else if (gfmsg instanceof JoinRequestMessage) { |
| requestId = ((JoinRequestMessage) gfmsg).getRequestId(); |
| } else if (gfmsg instanceof FindCoordinatorResponse) { |
| requestId = ((FindCoordinatorResponse) gfmsg).getRequestId(); |
| } else if (gfmsg instanceof JoinResponseMessage) { |
| requestId = ((JoinResponseMessage) gfmsg).getRequestId(); |
| } |
| |
| if (add) { |
| addRequestId(requestId, destination); |
| } |
| |
| return requestId; |
| } |
| |
| byte[] serializeMessage(GMSMessage gfmsg, BufferDataOutputStream out_stream) |
| throws IOException { |
| MemberIdentifier m = this.localAddress; |
| m.getMemberData().writeEssentialData(out_stream, |
| services.getSerializer().createSerializationContext(out_stream)); |
| services.getSerializer().getObjectSerializer() |
| .writeObject(services.getManager().unwrapMessage(gfmsg), out_stream); |
| |
| return out_stream.toByteArray(); |
| } |
| |
| void setMessageFlags(GMSMessage gfmsg, Message msg) { |
| // Bundling is mostly only useful if we're doing no-ack work, |
| // which is fairly rare |
| msg.setFlag(Flag.DONT_BUNDLE); |
| |
| if (gfmsg.isHighPriority() || AlertingAction.isThreadAlerting()) { |
| msg.setFlag(Flag.OOB); |
| msg.setFlag(Flag.NO_FC); |
| msg.setFlag(Flag.SKIP_BARRIER); |
| } |
| |
| msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); |
| } |
| |
| |
| /** |
| * deserialize a jgroups payload. If it's a DistributionMessage find the ID of the sender and |
| * establish it as the message's sender |
| */ |
| Object readJGMessage(Message jgmsg) { |
| Object result = null; |
| |
| int messageLength = jgmsg.getLength(); |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("deserializing a message of length " + messageLength); |
| } |
| |
| if (messageLength == 0) { |
| // jgroups messages with no payload are used for protocol interchange, such |
| // as STABLE_GOSSIP |
| logger.trace("message length is zero - ignoring"); |
| return null; |
| } |
| |
| Exception problem = null; |
| byte[] buf = jgmsg.getRawBuffer(); |
| try { |
| long start = services.getStatistics().startMsgDeserialization(); |
| |
| DataInputStream dis = |
| new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), jgmsg.getLength())); |
| |
| short ordinal = Version.readOrdinal(dis); |
| |
| if (ordinal < Version.getCurrentVersion().ordinal()) { |
| dis = new VersionedDataInputStream(dis, |
| Version.fromOrdinalNoThrow((short) ordinal, false)); |
| } |
| |
| // read |
| boolean isEncrypted = dis.readBoolean(); |
| |
| if (isEncrypted && encrypt == null) { |
| throw new GemFireConfigException("Got remote message as encrypted"); |
| } |
| |
| if (isEncrypted) { |
| result = readEncryptedMessage(dis, ordinal, encrypt); |
| } else { |
| result = deserializeMessage(dis, ordinal); |
| } |
| |
| |
| services.getStatistics().endMsgDeserialization(start); |
| } catch (ClassNotFoundException | IOException | RuntimeException e) { |
| problem = e; |
| } catch (Exception e) { |
| problem = e; |
| } |
| if (problem != null) { |
| logger.error(String.format("Exception deserializing message payload: %s", jgmsg), |
| problem); |
| return null; |
| } |
| |
| return result; |
| } |
| |
| void setSender(GMSMessage dm, MemberIdentifier m, short ordinal) { |
| MemberIdentifier sender = null; |
| // JoinRequestMessages are sent with an ID that may have been |
| // reused from a previous life by way of auto-reconnect, |
| // so we don't want to find a canonical reference for the |
| // request's sender ID |
| if (dm.getDSFID() == JOIN_REQUEST) { |
| sender = ((JoinRequestMessage) dm).getMemberID(); |
| } else { |
| sender = getMemberFromView(m, ordinal); |
| } |
| dm.setSender(sender); |
| } |
| |
| @SuppressWarnings("resource") |
| GMSMessage readEncryptedMessage(DataInputStream dis, short ordinal, |
| GMSEncrypt encryptLocal) throws Exception { |
| int dfsid = services.getSerializer().readDSFIDHeader(dis); |
| int requestId = dis.readInt(); |
| long start = services.getStatistics().startUDPMsgDecryption(); |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is " |
| + requestId + " myid " + this.localAddress); |
| } |
| MemberIdentifier pkMbr = null; |
| boolean readPK = false; |
| switch (dfsid) { |
| case FIND_COORDINATOR_REQ: |
| case JOIN_REQUEST: |
| readPK = true; |
| break; |
| case FIND_COORDINATOR_RESP: |
| case JOIN_RESPONSE: |
| // this will have requestId to know the PK |
| pkMbr = getRequestedMember(requestId); |
| break; |
| } |
| |
| byte[] data; |
| |
| byte[] pk = null; |
| |
| if (readPK) { |
| pk = StaticSerialization.readByteArray(dis); |
| data = StaticSerialization.readByteArray(dis); |
| // using prefixed pk from sender |
| data = encryptLocal.decryptData(data, pk); |
| } else { |
| data = StaticSerialization.readByteArray(dis); |
| // from cluster key |
| if (pkMbr != null) { |
| // using member public key |
| data = encryptLocal.decryptData(data, pkMbr); |
| } else { |
| // from cluster key |
| data = encryptLocal.decryptData(data); |
| } |
| } |
| |
| { |
| DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); |
| |
| if (ordinal < Version.getCurrentVersion().ordinal()) { |
| in = new VersionedDataInputStream(in, |
| Version.fromOrdinalNoThrow((short) ordinal, false)); |
| } |
| |
| GMSMessage result = deserializeMessage(in, ordinal); |
| |
| if (pk != null) { |
| logger.info("Setting public key for " + result.getSender() + " len " + pk.length); |
| setPublicKey(pk, (MemberIdentifier) result.getSender()); |
| } |
| |
| return result; |
| } |
| } catch (Exception e) { |
| throw new Exception("Message id is " + dfsid, e); |
| } finally { |
| services.getStatistics().endUDPMsgDecryption(start); |
| } |
| |
| } |
| |
| GMSMessage deserializeMessage(DataInputStream in, short ordinal) |
| throws ClassNotFoundException, IOException { |
| GMSMemberData info = new GMSMemberData(); |
| info.readEssentialData(in, services.getSerializer().createDeserializationContext(in)); |
| MemberIdentifier m = services.getMemberFactory().create(info); |
| GMSMessage result = services.getManager() |
| .wrapMessage(services.getSerializer().getObjectDeserializer().readObject(in)); |
| |
| setSender(result, m, ordinal); |
| |
| return result; |
| } |
| |
| /** look for certain messages that may need to be altered before being sent */ |
| void filterOutgoingMessage(GMSMessage m) { |
| switch (m.getDSFID()) { |
| case JOIN_RESPONSE: |
| JoinResponseMessage jrsp = (JoinResponseMessage) m; |
| |
| if (jrsp.getRejectionMessage() == null |
| && services.getConfig().isMulticastEnabled()) { |
| // get the multicast message digest and pass it with the join response |
| Digest digest = (Digest) this.myChannel.getProtocolStack().getTopProtocol() |
| .down(Event.GET_DIGEST_EVT); |
| BufferDataOutputStream hdos = new BufferDataOutputStream(500, Version.CURRENT); |
| try { |
| digest.writeTo(hdos); |
| } catch (Exception e) { |
| logger.fatal("Unable to serialize JGroups messaging digest", e); |
| } |
| jrsp.setMessengerData(hdos.toByteArray()); |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void filterIncomingMessage(GMSMessage m) { |
| switch (m.getDSFID()) { |
| case JOIN_RESPONSE: |
| JoinResponseMessage jrsp = (JoinResponseMessage) m; |
| |
| if (jrsp.getRejectionMessage() == null |
| && services.getConfig().isMulticastEnabled()) { |
| byte[] serializedDigest = jrsp.getMessengerData(); |
| ByteArrayInputStream bis = new ByteArrayInputStream(serializedDigest); |
| DataInputStream dis = new DataInputStream(bis); |
| try { |
| Digest digest = new Digest(); |
| digest.readFrom(dis); |
| logger.trace("installing JGroups message digest {} from {}", digest, m); |
| this.myChannel.getProtocolStack().getTopProtocol() |
| .down(new Event(Event.MERGE_DIGEST, digest)); |
| jrsp.setMessengerData(null); |
| } catch (Exception e) { |
| logger.fatal("Unable to read JGroups messaging digest", e); |
| } |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public MemberIdentifier getMemberID() { |
| return localAddress; |
| } |
| |
| /** |
| * returns the member ID for the given GMSMember object |
| */ |
| @SuppressWarnings("UnusedParameters") |
| private MemberIdentifier getMemberFromView(MemberIdentifier jgId, short version) { |
| return this.services.getJoinLeave().getMemberID(jgId); |
| } |
| |
| |
| @Override |
| public void emergencyClose() { |
| this.view = null; |
| if (localAddress.getVmViewId() >= 0) { |
| // keep track of old addresses that were used to successfully join the cluster |
| usedMemberIdentifiers.add(localAddress); |
| } |
| if (this.myChannel != null) { |
| if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) |
| || services.getManager().isReconnectingDS()) { |
| } else { |
| this.myChannel.disconnect(); |
| } |
| } |
| } |
| |
| @Override |
| public GMSQuorumChecker getQuorumChecker() { |
| GMSMembershipView view = this.view; |
| if (view == null) { |
| view = services.getJoinLeave().getView(); |
| if (view == null) { |
| view = services.getJoinLeave().getPreviousView(); |
| if (view == null) { |
| return null; |
| } |
| } |
| } |
| GMSQuorumChecker qc = |
| new GMSQuorumChecker(view, services.getConfig().getLossThreshold(), this.myChannel, |
| usedMemberIdentifiers); |
| qc.initialize(); |
| return qc; |
| } |
| |
| /** |
| * JGroupsReceiver receives incoming JGroups messages and passes them to a handler. It may be |
| * accessed through JChannel.getReceiver(). |
| */ |
| class JGroupsReceiver extends ReceiverAdapter { |
| |
| @Override |
| public void receive(Message jgmsg) { |
| receive(jgmsg, false); |
| } |
| |
| private void receive(Message jgmsg, boolean fromQuorumChecker) { |
| long startTime = services.getStatistics().startUDPDispatchRequest(); |
| try { |
| if (services.getManager().shutdownInProgress()) { |
| return; |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders()); |
| } |
| |
| // Respond to ping messages sent from other systems that are in a auto reconnect state |
| byte[] contents = jgmsg.getBuffer(); |
| if (contents == null) { |
| return; |
| } |
| if (pingPonger.isPingMessage(contents)) { |
| try { |
| pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc()); |
| } catch (Exception e) { |
| logger.info("Failed sending Pong response to " + jgmsg.getSrc()); |
| } |
| return; |
| } else if (pingPonger.isPongMessage(contents)) { |
| pongsReceived.incrementAndGet(); |
| return; |
| } |
| |
| Object o = readJGMessage(jgmsg); |
| if (o == null) { |
| return; |
| } |
| |
| GMSMessage msg = services.getManager().wrapMessage(o); |
| |
| // admin-only VMs don't have caches, so we ignore cache operations |
| // multicast to them, avoiding deserialization cost and classpath |
| // problems |
| if ((services.getConfig() |
| .getVmKind() == MemberIdentifier.ADMIN_ONLY_DM_TYPE) |
| && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) { |
| return; |
| } |
| |
| msg.resetTimestamp(); |
| msg.setBytesRead(jgmsg.getLength()); |
| |
| try { |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender()); |
| } |
| filterIncomingMessage(msg); |
| MessageHandler handler = getMessageHandler(msg); |
| if (fromQuorumChecker && handler instanceof HealthMonitor) { |
| // ignore suspect / heartbeat messages that happened during |
| // auto-reconnect because they very likely have old member IDs in them |
| } else { |
| handler.processMessage(msg); |
| } |
| |
| // record the scheduling of broadcast messages |
| NakAckHeader2 header = (NakAckHeader2) jgmsg.getHeader(nackack2HeaderId); |
| if (header != null && !jgmsg.isFlagSet(Flag.OOB)) { |
| recordScheduledSeqno(msg.getSender(), header.getSeqno()); |
| } |
| |
| } catch (MemberShunnedException e) { |
| // message from non-member - ignore |
| } |
| |
| } finally { |
| JGroupsMessenger.this.services.getStatistics().endUDPDispatchRequest(startTime); |
| } |
| } |
| |
| private void recordScheduledSeqno(MemberIdentifier member, long seqno) { |
| synchronized (scheduledMcastSeqnos) { |
| MessageTracker counter = scheduledMcastSeqnos.get(member); |
| if (counter == null) { |
| counter = new MessageTracker(seqno); |
| scheduledMcastSeqnos.put(member, counter); |
| } |
| counter.record(seqno); |
| } |
| } |
| |
| /** |
| * returns the handler that should process the given message. The default handler is the |
| * membership manager |
| */ |
| private MessageHandler getMessageHandler(GMSMessage msg) { |
| Class<?> msgClazz = msg.getClass(); |
| MessageHandler h = handlers.get(msgClazz); |
| if (h == null) { |
| for (Class<?> clazz : handlers.keySet()) { |
| if (clazz.isAssignableFrom(msgClazz)) { |
| h = handlers.get(clazz); |
| handlers.put(msg.getClass(), h); |
| break; |
| } |
| } |
| } |
| if (h == null) { |
| h = services.getManager(); |
| } |
| return h; |
| } |
| } |
| |
| @Override |
| public Set<MemberIdentifier> send(GMSMessage msg, GMSMembershipView alternateView) { |
| if (this.encrypt != null) { |
| this.encrypt.installView(alternateView); |
| } |
| return send(msg, true); |
| } |
| |
| @Override |
| public byte[] getPublicKey(MemberIdentifier mbr) { |
| if (encrypt != null) { |
| return encrypt.getPublicKey(mbr); |
| } |
| return null; |
| } |
| |
| @Override |
| public void setPublicKey(byte[] publickey, MemberIdentifier mbr) { |
| if (encrypt != null) { |
| logger.debug("Setting PK for member " + mbr); |
| encrypt.setPublicKey(publickey, mbr); |
| } |
| } |
| |
| @Override |
| public void setClusterSecretKey(byte[] clusterSecretKey) { |
| if (encrypt != null) { |
| logger.debug("Setting cluster key"); |
| encrypt.setClusterKey(clusterSecretKey); |
| } |
| } |
| |
| @Override |
| public byte[] getClusterSecretKey() { |
| if (encrypt != null) { |
| return encrypt.getClusterSecretKey(); |
| } |
| return null; |
| } |
| |
| private AtomicInteger requestId = new AtomicInteger((new Random().nextInt())); |
| private HashMap<Integer, MemberIdentifier> requestIdVsRecipients = new HashMap<>(); |
| |
| MemberIdentifier getRequestedMember(int requestId) { |
| return requestIdVsRecipients.remove(requestId); |
| } |
| |
| void addRequestId(int requestId, MemberIdentifier mbr) { |
| requestIdVsRecipients.put(requestId, mbr); |
| } |
| |
| @Override |
| public int getRequestId() { |
| return requestId.incrementAndGet(); |
| } |
| |
| @Override |
| public void initClusterKey() { |
| if (encrypt != null) { |
| try { |
| logger.info("Initializing cluster key"); |
| encrypt.initClusterSecretKey(); |
| } catch (Exception e) { |
| throw new RuntimeException("unable to create cluster key ", e); |
| } |
| } |
| } |
| |
| static class MessageTracker { |
| long highestSeqno; |
| |
| MessageTracker(long seqno) { |
| highestSeqno = seqno; |
| } |
| |
| long get() { |
| return highestSeqno; |
| } |
| |
| void record(long seqno) { |
| if (seqno > highestSeqno) { |
| highestSeqno = seqno; |
| } |
| } |
| } |
| } |