blob: c2ac20ee4f6748d1cf96f900d16baac5fb2268e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.messenger;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.protocols.PingData;
import org.jgroups.protocols.TP;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Responses;
import org.apache.geode.distributed.internal.membership.gms.Services;
/**
* JGroups will not send messages that have IpAddress destination addresses. Instead it requires a
* "logical address" and requests physical addresses from the Discovery protocol that is normally in
* a JGroups stack. We don't have one of these, so we need to maintain a mapping between logical and
* physical addresses.
*
*
*/
public class AddressManager extends Protocol {
private static final Logger logger = Services.getLogger();
private TP transport;
private Method setPingData;
boolean warningLogged = false;
@SuppressWarnings("unchecked")
@Override
public Object up(Event evt) {
switch (evt.getType()) {
case Event.FIND_MBRS:
List<Address> missing = (List<Address>) evt.getArg();
Responses responses = new Responses(false);
for (Address laddr : missing) {
try {
if (laddr instanceof JGAddress) {
PingData pd = new PingData(laddr, true, laddr.toString(), newIpAddress(laddr));
responses.addResponse(pd, false);
updateUDPCache(pd);
}
} catch (RuntimeException e) {
logger.warn("Unable to create PingData response", e);
throw e;
}
}
return responses;
}
return up_prot.up(evt);
}
private IpAddress newIpAddress(Address jgaddr) {
JGAddress addr = (JGAddress) jgaddr;
return addr.asIpAddress();
}
/**
* update the logical->physical address cache in UDP, which doesn't seem to be updated by UDP when
* processing responses from FIND_MBRS
*
*/
private void updateUDPCache(PingData pd) {
if (setPingData == null && !warningLogged) {
findPingDataMethod();
}
if (setPingData != null) {
try {
setPingData.invoke(transport, new Object[] {pd});
} catch (InvocationTargetException | IllegalAccessException e) {
if (!warningLogged) {
log.warn("Unable to update JGroups address cache - this may affect performance", e);
warningLogged = true;
}
}
}
}
/**
* find and initialize the method used to update UDP's address cache
*/
private void findPingDataMethod() {
transport = (TP) getProtocolStack().getTransport();
try {
setPingData = TP.class.getDeclaredMethod("setPingData", new Class<?>[] {PingData.class});
setPingData.setAccessible(true);
} catch (NoSuchMethodException e) {
if (!warningLogged) {
log.warn("Unable to update JGroups address cache - this may affect performance", e);
warningLogged = true;
}
}
}
}