blob: f7e142fcd6a0b457e4a3894b2f2f323d41123ebc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal.locator.wan;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import org.apache.logging.log4j.Logger;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* An implementation of
* {@link org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListener}
*
*
*/
public class LocatorMembershipListenerImpl implements LocatorMembershipListener {
private static final Logger logger = LogService.getLogger();
static final int LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS = 3;
private static final String LOCATORS_DISTRIBUTOR_THREAD_NAME = "LocatorsDistributorThread";
private static final String LISTENER_FAILURE_MESSAGE =
"Locator Membership listener could not exchange locator information {}:{} with {}:{} after {} retry attempts";
private static final String LISTENER_FINAL_FAILURE_MESSAGE =
"Locator Membership listener permanently failed to exchange locator information {}:{} with {}:{} after {} retry attempts";
private int port;
private DistributionConfig config;
private final TcpClient tcpClient;
private final ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo =
new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo =
new ConcurrentHashMap<>();
LocatorMembershipListenerImpl() {
this.tcpClient = new TcpClient();
}
LocatorMembershipListenerImpl(TcpClient tcpClient) {
this.tcpClient = tcpClient;
}
@Override
public void setPort(int port) {
this.port = port;
}
@Override
public void setConfig(DistributionConfig config) {
this.config = config;
}
Thread buildLocatorsDistributorThread(DistributionLocatorId localLocatorId,
Map<Integer, Set<DistributionLocatorId>> remoteLocators, DistributionLocatorId joiningLocator,
int joiningLocatorDistributedSystemId) {
Runnable distributeLocatorsRunnable =
new DistributeLocatorsRunnable(config.getMemberTimeout(), tcpClient, localLocatorId,
remoteLocators, joiningLocator, joiningLocatorDistributedSystemId);
ThreadFactory threadFactory = new LoggingThreadFactory(LOCATORS_DISTRIBUTOR_THREAD_NAME, true);
return threadFactory.newThread(distributeLocatorsRunnable);
}
/**
* When the new locator is added to remote locator metadata, inform all other locators in remote
* locator metadata about the new locator so that they can update their remote locator metadata.
*
* @param distributedSystemId Id of the joining locator.
* @param locator Id of the joining locator.
* @param sourceLocator Id of the locator that notified this locator about the new one.
*/
@Override
public void locatorJoined(final int distributedSystemId, final DistributionLocatorId locator,
final DistributionLocatorId sourceLocator) {
// DistributionLocatorId for local locator.
DistributionLocatorId localLocatorId;
String localLocator = config.getStartLocator();
if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
localLocatorId = new DistributionLocatorId(port, config.getBindAddress());
} else {
localLocatorId = new DistributionLocatorId(localLocator);
}
// Make a local copy of the current list of known locators.
ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo();
Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<>();
for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) {
Set<DistributionLocatorId> value = new CopyOnWriteHashSet<>(entry.getValue());
localCopy.put(entry.getKey(), value);
}
// Remove locators that don't need to be notified (myself, the joining one and the one that
// notified myself).
List<DistributionLocatorId> ignoreList = Arrays.asList(locator, localLocatorId, sourceLocator);
for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()) {
for (DistributionLocatorId removeLocId : ignoreList) {
entry.getValue().remove(removeLocId);
}
}
// Launch Locators Distributor thread.
Thread locatorsDistributorThread =
buildLocatorsDistributorThread(localLocatorId, localCopy, locator, distributedSystemId);
locatorsDistributorThread.start();
}
@Override
public Object handleRequest(Object request) {
Object response = null;
if (request instanceof RemoteLocatorJoinRequest) {
response = updateAllLocatorInfo((RemoteLocatorJoinRequest) request);
} else if (request instanceof LocatorJoinMessage) {
response = informAboutRemoteLocators((LocatorJoinMessage) request);
} else if (request instanceof RemoteLocatorPingRequest) {
response = getPingResponse((RemoteLocatorPingRequest) request);
} else if (request instanceof RemoteLocatorRequest) {
response = getRemoteLocators((RemoteLocatorRequest) request);
}
return response;
}
/**
* A locator from the request is checked against the existing remote locator metadata. If it is
* not available then added to existing remote locator metadata and LocatorMembershipListener is
* invoked to inform about the this newly added locator to all other locators available in remote
* locator metadata. As a response, remote locator metadata is sent.
*
*/
private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) {
int distributedSystemId = request.getDistributedSystemId();
DistributionLocatorId locator = request.getLocator();
LocatorHelper.addLocator(distributedSystemId, locator, this, null);
return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
}
@SuppressWarnings("unused")
private Object getPingResponse(RemoteLocatorPingRequest request) {
return new RemoteLocatorPingResponse();
}
private Object informAboutRemoteLocators(LocatorJoinMessage request) {
// TODO: FInd out the importance of list locatorJoinMessages. During
// refactoring I could not understand its significance
// synchronized (locatorJoinObject) {
// if (locatorJoinMessages.contains(request)) {
// return null;
// }
// locatorJoinMessages.add(request);
// }
int distributedSystemId = request.getDistributedSystemId();
DistributionLocatorId locator = request.getLocator();
DistributionLocatorId sourceLocatorId = request.getSourceLocator();
LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId);
return null;
}
private Object getRemoteLocators(RemoteLocatorRequest request) {
int dsId = request.getDsId();
Set<String> locators = this.getRemoteLocatorInfo(dsId);
return new RemoteLocatorResponse(locators);
}
@Override
public Set<String> getRemoteLocatorInfo(int dsId) {
return this.allServerLocatorsInfo.get(dsId);
}
@Override
public ConcurrentMap<Integer, Set<DistributionLocatorId>> getAllLocatorsInfo() {
return this.allLocatorsInfo;
}
@Override
public ConcurrentMap<Integer, Set<String>> getAllServerLocatorsInfo() {
return this.allServerLocatorsInfo;
}
@Override
public void clearLocatorInfo() {
allLocatorsInfo.clear();
allServerLocatorsInfo.clear();
}
private static class DistributeLocatorsRunnable implements Runnable {
private final int memberTimeout;
private final TcpClient tcpClient;
private final DistributionLocatorId localLocatorId;
private final Map<Integer, Set<DistributionLocatorId>> remoteLocators;
private final DistributionLocatorId joiningLocator;
private final int joiningLocatorDistributedSystemId;
DistributeLocatorsRunnable(int memberTimeout,
TcpClient tcpClient,
DistributionLocatorId localLocatorId,
Map<Integer, Set<DistributionLocatorId>> remoteLocators,
DistributionLocatorId joiningLocator,
int joiningLocatorDistributedSystemId) {
this.memberTimeout = memberTimeout;
this.tcpClient = tcpClient;
this.localLocatorId = localLocatorId;
this.remoteLocators = remoteLocators;
this.joiningLocator = joiningLocator;
this.joiningLocatorDistributedSystemId = joiningLocatorDistributedSystemId;
}
void sendMessage(DistributionLocatorId targetLocator, LocatorJoinMessage locatorJoinMessage,
Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages) {
DistributionLocatorId advertisedLocator = locatorJoinMessage.getLocator();
try {
tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, memberTimeout,
false);
} catch (Exception exception) {
if (logger.isDebugEnabled()) {
logger.debug(LISTENER_FAILURE_MESSAGE,
new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
targetLocator.getHostName(), targetLocator.getPort(), 1, exception});
}
if (!failedMessages.containsKey(targetLocator)) {
failedMessages.put(targetLocator, new HashSet<>());
}
failedMessages.get(targetLocator).add(locatorJoinMessage);
}
}
boolean retryMessage(DistributionLocatorId targetLocator, LocatorJoinMessage locatorJoinMessage,
int retryAttempt) {
DistributionLocatorId advertisedLocator = locatorJoinMessage.getLocator();
try {
tcpClient.requestToServer(targetLocator.getHost(), locatorJoinMessage, memberTimeout,
false);
return true;
} catch (Exception exception) {
if (retryAttempt == LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS) {
logger.warn(LISTENER_FINAL_FAILURE_MESSAGE,
new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
targetLocator.getHostName(), targetLocator.getPort(), retryAttempt, exception});
} else {
if (logger.isDebugEnabled()) {
logger.debug(LISTENER_FAILURE_MESSAGE,
new Object[] {advertisedLocator.getHostName(), advertisedLocator.getPort(),
targetLocator.getHostName(), targetLocator.getPort(), retryAttempt, exception});
}
}
return false;
}
}
@Override
public void run() {
Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new HashMap<>();
for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) {
for (DistributionLocatorId value : entry.getValue()) {
// Notify known remote locator about the advertised locator.
LocatorJoinMessage advertiseNewLocatorMessage = new LocatorJoinMessage(
joiningLocatorDistributedSystemId, joiningLocator, localLocatorId, "");
sendMessage(value, advertiseNewLocatorMessage, failedMessages);
// Notify the advertised locator about remote known locator.
LocatorJoinMessage advertiseKnownLocatorMessage =
new LocatorJoinMessage(entry.getKey(), value, localLocatorId, "");
sendMessage(joiningLocator, advertiseKnownLocatorMessage, failedMessages);
}
}
// Retry failed messages and remove those that succeed.
if (!failedMessages.isEmpty()) {
for (int attempt = 1; attempt <= LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS; attempt++) {
for (Map.Entry<DistributionLocatorId, Set<LocatorJoinMessage>> entry : failedMessages
.entrySet()) {
DistributionLocatorId targetLocator = entry.getKey();
Set<LocatorJoinMessage> joinMessages = entry.getValue();
for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
joinMessages.remove(locatorJoinMessage);
} else {
// Sleep between retries.
try {
Thread.sleep(memberTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn(
"Locator Membership listener permanently failed to exchange locator information due to interruption.");
}
}
}
}
}
}
}
}
}