blob: d351d64b6ce863c77599a8ac9405c9306c36df7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.locator;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorStatistics;
import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.serialization.ObjectDeserializer;
import org.apache.geode.internal.serialization.ObjectSerializer;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
public class MembershipLocatorImpl<ID extends MemberIdentifier> implements MembershipLocator<ID> {
private static final Logger logger = LogService.getLogger();
private final TcpServer server;
/**
* the TcpHandler used for peer location
*/
private final PrimaryHandler handler;
private final GMSLocator<ID> gmsLocator;
private final TcpClient locatorClient;
public MembershipLocatorImpl(int port, InetAddress bindAddress,
ProtocolChecker protocolChecker,
Supplier<ExecutorService> executorServiceSupplier,
TcpSocketCreator socketCreator,
ObjectSerializer objectSerializer,
ObjectDeserializer objectDeserializer,
TcpHandler fallbackHandler,
boolean locatorsAreCoordinators,
MembershipLocatorStatistics locatorStats, Path workingDirectory,
MembershipConfig config)
throws MembershipConfigurationException, UnknownHostException {
handler =
new PrimaryHandler(fallbackHandler, config.getLocatorWaitTime(),
() -> System.currentTimeMillis(), x -> Thread.sleep(x));
String host = bindAddress == null ? LocalHostUtil.getLocalHostName()
: bindAddress.getHostName();
String threadName = "Distribution Locator on " + host + ": " + port;
this.server = new TcpServer(port, bindAddress, handler,
threadName, protocolChecker,
locatorStats::getStatTime,
executorServiceSupplier,
socketCreator,
objectSerializer,
objectDeserializer,
GeodeGlossary.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT",
GeodeGlossary.GEMFIRE_PREFIX + "TcpServer.BACKLOG");
locatorClient = new TcpClient(socketCreator,
objectSerializer,
objectDeserializer, Socket::new);
gmsLocator =
new GMSLocator<>(bindAddress, config.getLocators(), locatorsAreCoordinators,
config.isNetworkPartitionDetectionEnabled(),
locatorStats, config.getSecurityUDPDHAlgo(), workingDirectory, locatorClient,
objectSerializer,
objectDeserializer);
handler.addHandler(PeerLocatorRequest.class, gmsLocator);
handler.addHandler(FindCoordinatorRequest.class, gmsLocator);
handler.addHandler(GetViewRequest.class, gmsLocator);
}
@Override
public int start() throws IOException {
if (!isAlive()) {
server.start();
}
return getPort();
}
@Override
public boolean isAlive() {
return server.isAlive();
}
@Override
public int getPort() {
return server.getPort();
}
@Override
public boolean isShuttingDown() {
return server.isShuttingDown();
}
@Override
public void waitToShutdown(long waitTime) throws InterruptedException {
server.join(waitTime);
}
@Override
public void waitToShutdown() throws InterruptedException {
server.join();
}
@Override
public void restarting() throws IOException {
server.restarting();
}
@Override
public SocketAddress getSocketAddress() {
return server.getSocketAddress();
}
@Override
public void setMembership(final Membership<ID> membership) {
final GMSMembership<ID> gmsMembership = (GMSMembership<ID>) membership;
setServices(gmsMembership.getServices());
}
@Override
public void addHandler(Class<?> clazz, TcpHandler handler) {
this.handler.addHandler(clazz, handler);
}
@Override
public boolean isHandled(Class<?> clazz) {
return this.handler.isHandled(clazz);
}
@VisibleForTesting
public GMSLocator<ID> getGMSLocator() {
return this.gmsLocator;
}
/**
* Services is a class internal to the membership module. As such, the ability to setServices
* is available ony within the module. It's not part of the external API.
*/
public void setServices(final Services<ID> services) {
gmsLocator.setServices(services);
}
public void stop() {
if (isAlive()) {
logger.info("Stopping {}", this);
try {
locatorClient
.stop(
new HostAndPort(((InetSocketAddress) getSocketAddress()).getHostString(),
getPort()));
} catch (ConnectException ignore) {
// must not be running
}
boolean interrupted = Thread.interrupted();
long waitTimeMillis = TcpServer.SHUTDOWN_WAIT_TIME * 2;
try {
// TcpServer up to SHUTDOWN_WAIT_TIME for its executor pool to shut down.
// We wait 2 * SHUTDOWN_WAIT_TIME here to account for that shutdown, and then our own.
waitToShutdown(waitTimeMillis);
} catch (InterruptedException ex) {
interrupted = true;
logger.warn("Interrupted while stopping {}", this, ex);
// Continue running -- doing our best to stop everything...
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if (isAlive()) {
logger.fatal("Could not stop {} in {} seconds", this,
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
}
}
}
@Override
public String toString() {
return "Locator on " + getSocketAddress() + ":" + getPort();
}
}